1 // Copyright 2019 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_TASK_SOURCE_H_ 6 #define BASE_TASK_THREAD_POOL_TASK_SOURCE_H_ 7 8 #include <stddef.h> 9 10 #include "base/base_export.h" 11 #include "base/containers/intrusive_heap.h" 12 #include "base/dcheck_is_on.h" 13 #include "base/memory/raw_ptr.h" 14 #include "base/memory/ref_counted.h" 15 #include "base/memory/stack_allocated.h" 16 #include "base/sequence_token.h" 17 #include "base/task/common/checked_lock.h" 18 #include "base/task/task_traits.h" 19 #include "base/task/thread_pool/task.h" 20 #include "base/task/thread_pool/task_source_sort_key.h" 21 #include "base/threading/sequence_local_storage_map.h" 22 #include "base/time/time.h" 23 24 namespace base { 25 namespace internal { 26 27 class TaskTracker; 28 29 enum class TaskSourceExecutionMode { 30 kParallel, 31 kSequenced, 32 kSingleThread, 33 kJob, 34 kMax = kJob, 35 }; 36 37 struct BASE_EXPORT ExecutionEnvironment { 38 STACK_ALLOCATED(); 39 40 public: ExecutionEnvironmentExecutionEnvironment41 ExecutionEnvironment(SequenceToken token) : token(token) {} 42 ExecutionEnvironmentExecutionEnvironment43 ExecutionEnvironment(SequenceToken token, 44 SequenceLocalStorageMap* sequence_local_storage, 45 SingleThreadTaskRunner* single_thread_task_runner) 46 : token(token), 47 sequence_local_storage(sequence_local_storage), 48 single_thread_task_runner(single_thread_task_runner) {} 49 ExecutionEnvironmentExecutionEnvironment50 ExecutionEnvironment(SequenceToken token, 51 SequenceLocalStorageMap* sequence_local_storage, 52 SequencedTaskRunner* sequenced_task_runner) 53 : token(token), 54 sequence_local_storage(sequence_local_storage), 55 sequenced_task_runner(sequenced_task_runner) {} 56 ~ExecutionEnvironment(); 57 58 const SequenceToken token; 59 SequenceLocalStorageMap* const sequence_local_storage = nullptr; 60 SingleThreadTaskRunner* const single_thread_task_runner = nullptr; 61 SequencedTaskRunner* const sequenced_task_runner = nullptr; 62 }; 63 64 // A TaskSource is a virtual class that provides a series of Tasks that must be 65 // executed immediately or in the future. 66 // 67 // When a task source has delayed tasks but no immediate tasks, the scheduler 68 // must call OnBecomeReady() after HasReadyTasks(now) == true, which is 69 // guaranteed once now >= GetDelayedSortKey(). 70 // 71 // A task source is registered when it's ready to be added to the immediate 72 // queue. A task source is ready to be queued when either: 73 // 1- It has new tasks that can run concurrently as a result of external 74 // operations, e.g. posting a new immediate task to an empty Sequence or 75 // increasing max concurrency of a JobTaskSource; 76 // 2- A worker finished running a task from it and both DidProcessTask() and 77 // WillReEnqueue() returned true; or 78 // 3- A worker is about to run a task from it and WillRunTask() returned 79 // kAllowedNotSaturated. 80 // 4- A delayed task became ready and OnBecomeReady() returns true. 81 // 82 // A worker may perform the following sequence of operations on a 83 // RegisteredTaskSource after obtaining it from the queue: 84 // 1- Check whether a task can run with WillRunTask() (and register/enqueue the 85 // task source again if not saturated). 86 // 2- (optional) Iff (1) determined that a task can run, access the next task 87 // with TakeTask(). 88 // 3- (optional) Execute the task. 89 // 4- Inform the task source that a task was processed with DidProcessTask(), 90 // and re-enqueue the task source iff requested. The task source is ready to 91 // run immediately iff WillReEnqueue() returns true. 92 // When a task source is registered multiple times, many overlapping chains of 93 // operations may run concurrently, as permitted by WillRunTask(). This allows 94 // tasks from the same task source to run in parallel. 95 // However, the following invariants are kept: 96 // - The number of workers concurrently running tasks never goes over the 97 // intended concurrency. 98 // - If the task source has more tasks that can run concurrently, it must be 99 // queued. 100 // 101 // Note: there is a known refcounted-ownership cycle in the ThreadPool 102 // architecture: TaskSource -> TaskRunner -> TaskSource -> ... This is okay so 103 // long as the other owners of TaskSource (PriorityQueue and WorkerThread in 104 // alternation and ThreadGroup::WorkerThreadDelegateImpl::GetWork() 105 // temporarily) keep running it (and taking Tasks from it as a result). A 106 // dangling reference cycle would only occur should they release their reference 107 // to it while it's not empty. In other words, it is only correct for them to 108 // release it when DidProcessTask() returns false. 109 // 110 // This class is thread-safe. 111 class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> { 112 public: 113 // Indicates whether WillRunTask() allows TakeTask() to be called on a 114 // RegisteredTaskSource. 115 enum class RunStatus { 116 // TakeTask() cannot be called. 117 kDisallowed, 118 // TakeTask() may called, and the TaskSource has not reached its maximum 119 // concurrency (i.e. the TaskSource still needs to be queued). 120 kAllowedNotSaturated, 121 // TakeTask() may called, and the TaskSource has reached its maximum 122 // concurrency (i.e. the TaskSource no longer needs to be queued). 123 kAllowedSaturated, 124 }; 125 126 // A Transaction can perform multiple operations atomically on a 127 // TaskSource. While a Transaction is alive, it is guaranteed that nothing 128 // else will access the TaskSource; the TaskSource's lock is held for the 129 // lifetime of the Transaction. No Transaction must be held when ~TaskSource() 130 // is called. 131 class BASE_EXPORT Transaction { 132 STACK_ALLOCATED(); 133 134 public: 135 Transaction(Transaction&& other); 136 Transaction(const Transaction&) = delete; 137 Transaction& operator=(const Transaction&) = delete; 138 ~Transaction(); 139 140 operator bool() const { return !!task_source_; } 141 142 // Sets TaskSource priority to |priority|. 143 void UpdatePriority(TaskPriority priority); 144 145 // Returns the traits of all Tasks in the TaskSource. traits()146 TaskTraits traits() const { return task_source_->traits_; } 147 task_source()148 TaskSource* task_source() const { return task_source_; } 149 150 void Release(); 151 152 protected: 153 explicit Transaction(TaskSource* task_source); 154 155 private: 156 friend class TaskSource; 157 158 TaskSource* task_source_ = nullptr; 159 }; 160 161 // |traits| is metadata that applies to all Tasks in the TaskSource. 162 TaskSource(const TaskTraits& traits, 163 TaskSourceExecutionMode execution_mode); 164 TaskSource(const TaskSource&) = delete; 165 TaskSource& operator=(const TaskSource&) = delete; 166 167 // Begins a Transaction. This method cannot be called on a thread which has an 168 // active TaskSource::Transaction. 169 [[nodiscard]] Transaction BeginTransaction(); 170 171 virtual ExecutionEnvironment GetExecutionEnvironment() = 0; 172 173 // Thread-safe but the returned value may immediately be obsolete. As such 174 // this should only be used as a best-effort guess of how many more workers 175 // are needed. This may be called on an empty task source. 176 virtual size_t GetRemainingConcurrency() const = 0; 177 178 // Returns a TaskSourceSortKey representing the priority of the TaskSource. 179 virtual TaskSourceSortKey GetSortKey() const = 0; 180 // Returns a Timeticks object representing the next delayed runtime of the 181 // TaskSource. 182 virtual TimeTicks GetDelayedSortKey() const = 0; 183 // Returns true if there are tasks ready to be executed. Thread-safe but the 184 // returned value may immediately be obsolete. 185 virtual bool HasReadyTasks(TimeTicks now) const = 0; 186 // Returns true if the TaskSource should be moved to the immediate queue 187 // due to ready delayed tasks. Note: Returns false if the TaskSource contains 188 // ready delayed tasks, but expects to already be in the immediate queue. 189 virtual bool OnBecomeReady() = 0; 190 191 // Support for IntrusiveHeap in ThreadGroup::PriorityQueue. 192 void SetImmediateHeapHandle(const HeapHandle& handle); 193 void ClearImmediateHeapHandle(); GetImmediateHeapHandle()194 HeapHandle GetImmediateHeapHandle() const { 195 return immediate_pq_heap_handle_; 196 } 197 immediate_heap_handle()198 HeapHandle immediate_heap_handle() const { return immediate_pq_heap_handle_; } 199 200 // Support for IntrusiveHeap in ThreadGroup::DelayedPriorityQueue. 201 void SetDelayedHeapHandle(const HeapHandle& handle); 202 void ClearDelayedHeapHandle(); GetDelayedHeapHandle()203 HeapHandle GetDelayedHeapHandle() const { return delayed_pq_heap_handle_; } 204 delayed_heap_handle()205 HeapHandle delayed_heap_handle() const { return delayed_pq_heap_handle_; } 206 207 // Returns the shutdown behavior of all Tasks in the TaskSource. Can be 208 // accessed without a Transaction because it is never mutated. shutdown_behavior()209 TaskShutdownBehavior shutdown_behavior() const { 210 return traits_.shutdown_behavior(); 211 } 212 // Returns a racy priority of the TaskSource. Can be accessed without a 213 // Transaction but may return an outdated result. priority_racy()214 TaskPriority priority_racy() const { 215 return priority_racy_.load(std::memory_order_relaxed); 216 } 217 // Returns the thread policy of the TaskSource. Can be accessed without a 218 // Transaction because it is never mutated. thread_policy()219 ThreadPolicy thread_policy() const { return traits_.thread_policy(); } 220 execution_mode()221 TaskSourceExecutionMode execution_mode() const { return execution_mode_; } 222 223 void ClearForTesting(); 224 225 protected: 226 virtual ~TaskSource(); 227 228 virtual RunStatus WillRunTask() = 0; 229 230 // Implementations of TakeTask(), DidProcessTask(), WillReEnqueue(), and 231 // Clear() must ensure proper synchronization iff |transaction| is nullptr. 232 virtual Task TakeTask(TaskSource::Transaction* transaction) = 0; 233 virtual bool DidProcessTask(TaskSource::Transaction* transaction) = 0; 234 virtual bool WillReEnqueue(TimeTicks now, 235 TaskSource::Transaction* transaction) = 0; 236 237 // This may be called for each outstanding RegisteredTaskSource that's ready. 238 // The implementation needs to support this being called multiple times; 239 // unless it guarantees never to hand-out multiple RegisteredTaskSources that 240 // are concurrently ready. 241 virtual std::optional<Task> Clear(TaskSource::Transaction* transaction) = 0; 242 243 // Sets TaskSource priority to |priority|. 244 void UpdatePriority(TaskPriority priority); 245 246 // The TaskTraits of all Tasks in the TaskSource. 247 TaskTraits traits_; 248 249 // The cached priority for atomic access. 250 std::atomic<TaskPriority> priority_racy_; 251 252 // Synchronizes access to all members. 253 mutable CheckedLock lock_{UniversalPredecessor()}; 254 255 private: 256 friend class RefCountedThreadSafe<TaskSource>; 257 friend class RegisteredTaskSource; 258 259 // The TaskSource's position in its current PriorityQueue. Access is protected 260 // by the PriorityQueue's lock. 261 HeapHandle immediate_pq_heap_handle_; 262 263 // The TaskSource's position in its current DelayedPriorityQueue. Access is 264 // protected by the DelayedPriorityQueue's lock. 265 HeapHandle delayed_pq_heap_handle_; 266 267 TaskSourceExecutionMode execution_mode_; 268 }; 269 270 // Wrapper around TaskSource to signify the intent to queue and run it. 271 // RegisteredTaskSource can only be created with TaskTracker and may only be 272 // used by a single worker at a time. However, the same task source may be 273 // registered several times, spawning multiple RegisteredTaskSources. A 274 // RegisteredTaskSource resets to its initial state when WillRunTask() fails 275 // or after DidProcessTask() and WillReEnqueue(), so it can be used again. 276 class BASE_EXPORT RegisteredTaskSource { 277 public: 278 RegisteredTaskSource(); 279 RegisteredTaskSource(std::nullptr_t); 280 RegisteredTaskSource(RegisteredTaskSource&& other) noexcept; 281 RegisteredTaskSource(const RegisteredTaskSource&) = delete; 282 RegisteredTaskSource& operator=(const RegisteredTaskSource&) = delete; 283 ~RegisteredTaskSource(); 284 285 RegisteredTaskSource& operator=(RegisteredTaskSource&& other); 286 287 operator bool() const { return task_source_ != nullptr; } 288 TaskSource* operator->() const { return task_source_.get(); } get()289 TaskSource* get() const { return task_source_.get(); } 290 291 static RegisteredTaskSource CreateForTesting( 292 scoped_refptr<TaskSource> task_source, 293 TaskTracker* task_tracker = nullptr); 294 295 // Can only be called if this RegisteredTaskSource is in its initial state. 296 // Returns the underlying task source. An Optional is used in preparation for 297 // the merge between ThreadPool and TaskQueueManager (in Blink). 298 // https://crbug.com/783309 299 scoped_refptr<TaskSource> Unregister(); 300 301 // Informs this TaskSource that the current worker would like to run a Task 302 // from it. Can only be called if in its initial state. Returns a RunStatus 303 // that indicates if the operation is allowed (TakeTask() can be called). 304 TaskSource::RunStatus WillRunTask(); 305 306 // Returns the next task to run from this TaskSource. This should be called 307 // only after WillRunTask() returned RunStatus::kAllowed*. |transaction| is 308 // optional and should only be provided if this operation is already part of 309 // a transaction. 310 [[nodiscard]] Task TakeTask(TaskSource::Transaction* transaction = nullptr); 311 312 // Must be called after WillRunTask() or once the task was run if TakeTask() 313 // was called. This resets this RegisteredTaskSource to its initial state so 314 // that WillRunTask() may be called again. |transaction| is optional and 315 // should only be provided if this operation is already part of a transaction. 316 // Returns true if the TaskSource should be queued after this operation. 317 bool DidProcessTask(TaskSource::Transaction* transaction = nullptr); 318 319 // Must be called iff DidProcessTask() previously returns true . 320 // |transaction| is optional and should only be provided if this 321 // operation is already part of a transaction. Returns true if the 322 // TaskSource is ready to run immediately. 323 bool WillReEnqueue(TimeTicks now, 324 TaskSource::Transaction* transaction = nullptr); 325 326 // Returns a task that clears this TaskSource to make it empty. |transaction| 327 // is optional and should only be provided if this operation is already part 328 // of a transaction. 329 [[nodiscard]] std::optional<Task> Clear( 330 TaskSource::Transaction* transaction = nullptr); 331 332 private: 333 friend class TaskTracker; 334 RegisteredTaskSource(scoped_refptr<TaskSource> task_source, 335 TaskTracker* task_tracker); 336 337 #if DCHECK_IS_ON() 338 // Indicates the step of a task execution chain. 339 enum class State { 340 kInitial, // WillRunTask() may be called. 341 kReady, // After WillRunTask() returned a valid RunStatus. 342 }; 343 344 State run_step_ = State::kInitial; 345 #endif // DCHECK_IS_ON() 346 347 scoped_refptr<TaskSource> task_source_; 348 raw_ptr<TaskTracker, LeakedDanglingUntriaged> task_tracker_ = nullptr; 349 }; 350 351 // A pair of Transaction and RegisteredTaskSource. Useful to carry a 352 // RegisteredTaskSource with an associated Transaction. 353 struct BASE_EXPORT RegisteredTaskSourceAndTransaction { 354 STACK_ALLOCATED(); 355 356 public: 357 RegisteredTaskSourceAndTransaction(RegisteredTaskSource task_source_in, 358 TaskSource::Transaction transaction_in); 359 360 RegisteredTaskSourceAndTransaction( 361 RegisteredTaskSourceAndTransaction&& other) = default; 362 RegisteredTaskSourceAndTransaction( 363 const RegisteredTaskSourceAndTransaction&) = delete; 364 RegisteredTaskSourceAndTransaction& operator=( 365 const RegisteredTaskSourceAndTransaction&) = delete; 366 ~RegisteredTaskSourceAndTransaction() = default; 367 368 static RegisteredTaskSourceAndTransaction FromTaskSource( 369 RegisteredTaskSource task_source_in); 370 371 RegisteredTaskSource task_source; 372 TaskSource::Transaction transaction; 373 }; 374 375 struct BASE_EXPORT TaskSourceAndTransaction { 376 STACK_ALLOCATED(); 377 378 public: 379 TaskSourceAndTransaction(scoped_refptr<TaskSource> task_source_in, 380 TaskSource::Transaction transaction_in); 381 382 TaskSourceAndTransaction(TaskSourceAndTransaction&& other); 383 TaskSourceAndTransaction(const TaskSourceAndTransaction&) = delete; 384 TaskSourceAndTransaction& operator=(const TaskSourceAndTransaction&) = delete; 385 ~TaskSourceAndTransaction(); 386 387 static TaskSourceAndTransaction FromTaskSource( 388 scoped_refptr<TaskSource> task_source_in); 389 390 scoped_refptr<TaskSource> task_source; 391 TaskSource::Transaction transaction; 392 }; 393 394 } // namespace internal 395 } // namespace base 396 397 #endif // BASE_TASK_THREAD_POOL_TASK_SOURCE_H_ 398