xref: /aosp_15_r20/external/cronet/base/task/sequence_manager/task_queue_impl.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/sequence_manager/task_queue_impl.h"
6 
7 #include <inttypes.h>
8 
9 #include <memory>
10 #include <optional>
11 #include <utility>
12 
13 #include "base/check.h"
14 #include "base/compiler_specific.h"
15 #include "base/feature_list.h"
16 #include "base/logging.h"
17 #include "base/memory/scoped_refptr.h"
18 #include "base/metrics/histogram_macros.h"
19 #include "base/notreached.h"
20 #include "base/observer_list.h"
21 #include "base/ranges/algorithm.h"
22 #include "base/sequence_token.h"
23 #include "base/strings/stringprintf.h"
24 #include "base/task/common/scoped_defer_task_posting.h"
25 #include "base/task/default_delayed_task_handle_delegate.h"
26 #include "base/task/sequence_manager/associated_thread_id.h"
27 #include "base/task/sequence_manager/delayed_task_handle_delegate.h"
28 #include "base/task/sequence_manager/fence.h"
29 #include "base/task/sequence_manager/sequence_manager_impl.h"
30 #include "base/task/sequence_manager/task_order.h"
31 #include "base/task/sequence_manager/wake_up_queue.h"
32 #include "base/task/sequence_manager/work_queue.h"
33 #include "base/task/single_thread_task_runner.h"
34 #include "base/task/task_features.h"
35 #include "base/task/task_observer.h"
36 #include "base/threading/thread_restrictions.h"
37 #include "base/time/time.h"
38 #include "base/trace_event/base_tracing.h"
39 #include "build/build_config.h"
40 #include "third_party/abseil-cpp/absl/container/inlined_vector.h"
41 
42 namespace base {
43 namespace sequence_manager {
44 
45 namespace internal {
46 
47 // This class outside the anonymous namespace exists to allow being a friend of
48 // `SingleThreadTaskRunner::CurrentDefaultHandle` in order to access
49 // `SingleThreadTaskRunner::CurrentDefaultHandle::MayAlreadyExist`.
50 class CurrentDefaultHandleOverrideForRunOrPostTask {
51  public:
CurrentDefaultHandleOverrideForRunOrPostTask(scoped_refptr<SequencedTaskRunner> task_runner)52   explicit CurrentDefaultHandleOverrideForRunOrPostTask(
53       scoped_refptr<SequencedTaskRunner> task_runner)
54       : sttr_override_(
55             nullptr,
56             SingleThreadTaskRunner::CurrentDefaultHandle::MayAlreadyExist{}),
57         str_override_(std::move(task_runner)) {}
58 
59  private:
60   SingleThreadTaskRunner::CurrentDefaultHandle sttr_override_;
61   SequencedTaskRunner::CurrentDefaultHandle str_override_;
62 };
63 
64 namespace {
65 
66 // An atomic is used here because the value is queried from other threads when
67 // tasks are posted cross-thread, which can race with its initialization.
68 std::atomic<base::TimeDelta> g_max_precise_delay{kDefaultMaxPreciseDelay};
69 #if BUILDFLAG(IS_WIN)
70 // An atomic is used here because the flag is queried from other threads when
71 // tasks are posted cross-thread, which can race with its initialization.
72 std::atomic_bool g_explicit_high_resolution_timer_win{true};
73 #endif  // BUILDFLAG(IS_WIN)
74 
RunTaskSynchronously(const AssociatedThreadId * associated_thread,scoped_refptr<SingleThreadTaskRunner> task_runner,OnceClosure closure)75 void RunTaskSynchronously(const AssociatedThreadId* associated_thread,
76                           scoped_refptr<SingleThreadTaskRunner> task_runner,
77                           OnceClosure closure) {
78   base::internal::TaskScope sequence_scope(
79       associated_thread->GetBoundSequenceToken(),
80       /* is_thread_bound=*/false,
81       /* is_running_synchronously=*/true);
82   CurrentDefaultHandleOverrideForRunOrPostTask task_runner_override(
83       std::move(task_runner));
84   std::move(closure).Run();
85 }
86 
87 }  // namespace
88 
GuardedTaskPoster(TaskQueueImpl * outer)89 TaskQueueImpl::GuardedTaskPoster::GuardedTaskPoster(TaskQueueImpl* outer)
90     : outer_(outer) {}
91 
~GuardedTaskPoster()92 TaskQueueImpl::GuardedTaskPoster::~GuardedTaskPoster() {}
93 
PostTask(PostedTask task)94 bool TaskQueueImpl::GuardedTaskPoster::PostTask(PostedTask task) {
95   // Do not process new PostTasks while we are handling a PostTask (tracing
96   // has to do this) as it can lead to a deadlock and defer it instead.
97   ScopedDeferTaskPosting disallow_task_posting;
98 
99   auto token = operations_controller_.TryBeginOperation();
100   if (!token)
101     return false;
102 
103   outer_->PostTask(std::move(task));
104   return true;
105 }
106 
PostCancelableTask(PostedTask task)107 DelayedTaskHandle TaskQueueImpl::GuardedTaskPoster::PostCancelableTask(
108     PostedTask task) {
109   // Do not process new PostTasks while we are handling a PostTask (tracing
110   // has to do this) as it can lead to a deadlock and defer it instead.
111   ScopedDeferTaskPosting disallow_task_posting;
112 
113   auto token = operations_controller_.TryBeginOperation();
114   if (!token)
115     return DelayedTaskHandle();
116 
117   auto delayed_task_handle_delegate =
118       std::make_unique<DelayedTaskHandleDelegate>(outer_);
119   task.delayed_task_handle_delegate = delayed_task_handle_delegate->AsWeakPtr();
120 
121   outer_->PostTask(std::move(task));
122   DCHECK(delayed_task_handle_delegate->IsValid());
123   return DelayedTaskHandle(std::move(delayed_task_handle_delegate));
124 }
125 
RunOrPostTask(PostedTask task)126 bool TaskQueueImpl::GuardedTaskPoster::RunOrPostTask(PostedTask task) {
127   auto token = operations_controller_.TryBeginOperation();
128   if (!token) {
129     return false;
130   }
131 
132   auto sync_work_auth =
133       outer_->sequence_manager_->TryAcquireSyncWorkAuthorization();
134   // The queue may be disabled immediately after checking
135   // `IsQueueEnabledFromAnyThread()`. That won't prevent the task from running.
136   if (sync_work_auth.IsValid() && outer_->IsQueueEnabledFromAnyThread()) {
137     RunTaskSynchronously(outer_->associated_thread_.get(),
138                          outer_->sequence_manager_->GetTaskRunner(),
139                          std::move(task.callback));
140     return true;
141   }
142 
143   return PostTask(std::move(task));
144 }
145 
TaskRunner(scoped_refptr<GuardedTaskPoster> task_poster,scoped_refptr<const AssociatedThreadId> associated_thread,TaskType task_type)146 TaskQueueImpl::TaskRunner::TaskRunner(
147     scoped_refptr<GuardedTaskPoster> task_poster,
148     scoped_refptr<const AssociatedThreadId> associated_thread,
149     TaskType task_type)
150     : task_poster_(std::move(task_poster)),
151       associated_thread_(std::move(associated_thread)),
152       task_type_(task_type) {}
153 
~TaskRunner()154 TaskQueueImpl::TaskRunner::~TaskRunner() {}
155 
PostDelayedTask(const Location & location,OnceClosure callback,TimeDelta delay)156 bool TaskQueueImpl::TaskRunner::PostDelayedTask(const Location& location,
157                                                 OnceClosure callback,
158                                                 TimeDelta delay) {
159   return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
160                                            delay, Nestable::kNestable,
161                                            task_type_));
162 }
163 
PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,const Location & location,OnceClosure callback,TimeTicks delayed_run_time,base::subtle::DelayPolicy delay_policy)164 bool TaskQueueImpl::TaskRunner::PostDelayedTaskAt(
165     subtle::PostDelayedTaskPassKey,
166     const Location& location,
167     OnceClosure callback,
168     TimeTicks delayed_run_time,
169     base::subtle::DelayPolicy delay_policy) {
170   return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
171                                            delayed_run_time, delay_policy,
172                                            Nestable::kNestable, task_type_));
173 }
174 
PostCancelableDelayedTaskAt(subtle::PostDelayedTaskPassKey pass_key,const Location & location,OnceClosure callback,TimeTicks delayed_run_time,base::subtle::DelayPolicy delay_policy)175 DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTaskAt(
176     subtle::PostDelayedTaskPassKey pass_key,
177     const Location& location,
178     OnceClosure callback,
179     TimeTicks delayed_run_time,
180     base::subtle::DelayPolicy delay_policy) {
181   return task_poster_->PostCancelableTask(
182       PostedTask(this, std::move(callback), location, delayed_run_time,
183                  delay_policy, Nestable::kNestable, task_type_));
184 }
185 
PostCancelableDelayedTask(subtle::PostDelayedTaskPassKey pass_key,const Location & location,OnceClosure callback,TimeDelta delay)186 DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTask(
187     subtle::PostDelayedTaskPassKey pass_key,
188     const Location& location,
189     OnceClosure callback,
190     TimeDelta delay) {
191   return task_poster_->PostCancelableTask(
192       PostedTask(this, std::move(callback), location, delay,
193                  Nestable::kNestable, task_type_));
194 }
195 
PostNonNestableDelayedTask(const Location & location,OnceClosure callback,TimeDelta delay)196 bool TaskQueueImpl::TaskRunner::PostNonNestableDelayedTask(
197     const Location& location,
198     OnceClosure callback,
199     TimeDelta delay) {
200   return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
201                                            delay, Nestable::kNonNestable,
202                                            task_type_));
203 }
204 
RunOrPostTask(subtle::RunOrPostTaskPassKey,const Location & location,OnceClosure callback)205 bool TaskQueueImpl::TaskRunner::RunOrPostTask(subtle::RunOrPostTaskPassKey,
206                                               const Location& location,
207                                               OnceClosure callback) {
208   return task_poster_->RunOrPostTask(
209       PostedTask(this, std::move(callback), location, TimeDelta(),
210                  Nestable::kNestable, task_type_));
211 }
212 
BelongsToCurrentThread() const213 bool TaskQueueImpl::TaskRunner::BelongsToCurrentThread() const {
214   return associated_thread_->IsBoundToCurrentThread();
215 }
216 
RunsTasksInCurrentSequence() const217 bool TaskQueueImpl::TaskRunner::RunsTasksInCurrentSequence() const {
218   // Return true on the bound thread. This works even after `thread_local`
219   // destruction.
220   if (BelongsToCurrentThread()) {
221     return true;
222   }
223 
224   // Return true in a `RunOrPostTask` callback running synchronously on a
225   // different thread.
226   if (associated_thread_->IsBound() &&
227       associated_thread_->GetBoundSequenceToken() ==
228           base::internal::SequenceToken::GetForCurrentThread()) {
229     return true;
230   }
231 
232   return false;
233 }
234 
235 // static
InitializeFeatures()236 void TaskQueueImpl::InitializeFeatures() {
237   g_max_precise_delay = kMaxPreciseDelay.Get();
238 #if BUILDFLAG(IS_WIN)
239   g_explicit_high_resolution_timer_win.store(
240       FeatureList::IsEnabled(kExplicitHighResolutionTimerWin),
241       std::memory_order_relaxed);
242 #endif  // BUILDFLAG(IS_WIN)
243 }
244 
TaskQueueImpl(SequenceManagerImpl * sequence_manager,WakeUpQueue * wake_up_queue,const TaskQueue::Spec & spec)245 TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager,
246                              WakeUpQueue* wake_up_queue,
247                              const TaskQueue::Spec& spec)
248     : name_(spec.name),
249       sequence_manager_(sequence_manager),
250       associated_thread_(sequence_manager
251                              ? sequence_manager->associated_thread()
252                              : AssociatedThreadId::CreateBound()),
253       task_poster_(MakeRefCounted<GuardedTaskPoster>(this)),
254       main_thread_only_(this, wake_up_queue),
255       empty_queues_to_reload_handle_(
256           sequence_manager
257               ? sequence_manager->GetFlagToRequestReloadForEmptyQueue(this)
258               : AtomicFlagSet::AtomicFlag()),
259       should_monitor_quiescence_(spec.should_monitor_quiescence),
260       should_notify_observers_(spec.should_notify_observers),
261       delayed_fence_allowed_(spec.delayed_fence_allowed),
262       default_task_runner_(CreateTaskRunner(kTaskTypeNone)) {
263   UpdateCrossThreadQueueStateLocked();
264   // SequenceManager can't be set later, so we need to prevent task runners
265   // from posting any tasks.
266   if (sequence_manager_)
267     task_poster_->StartAcceptingOperations();
268 }
269 
~TaskQueueImpl()270 TaskQueueImpl::~TaskQueueImpl() {
271 #if DCHECK_IS_ON()
272   base::internal::CheckedAutoLock lock(any_thread_lock_);
273   // NOTE this check shouldn't fire because |SequenceManagerImpl::queues_|
274   // contains a strong reference to this TaskQueueImpl and the
275   // SequenceManagerImpl destructor calls UnregisterTaskQueue on all task
276   // queues.
277   DCHECK(any_thread_.unregistered)
278       << "UnregisterTaskQueue must be called first!";
279 #endif
280 }
281 
282 TaskQueueImpl::AnyThread::AnyThread() = default;
283 TaskQueueImpl::AnyThread::~AnyThread() = default;
284 
285 TaskQueueImpl::AnyThread::TracingOnly::TracingOnly() = default;
286 TaskQueueImpl::AnyThread::TracingOnly::~TracingOnly() = default;
287 
MainThreadOnly(TaskQueueImpl * task_queue,WakeUpQueue * wake_up_queue)288 TaskQueueImpl::MainThreadOnly::MainThreadOnly(TaskQueueImpl* task_queue,
289                                               WakeUpQueue* wake_up_queue)
290     : wake_up_queue(wake_up_queue),
291       delayed_work_queue(
292           new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)),
293       immediate_work_queue(new WorkQueue(task_queue,
294                                          "immediate",
295                                          WorkQueue::QueueType::kImmediate)) {}
296 
297 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default;
298 
CreateTaskRunner(TaskType task_type) const299 scoped_refptr<SingleThreadTaskRunner> TaskQueueImpl::CreateTaskRunner(
300     TaskType task_type) const {
301   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
302   return MakeRefCounted<TaskRunner>(task_poster_, associated_thread_,
303                                     task_type);
304 }
305 
task_runner() const306 const scoped_refptr<SingleThreadTaskRunner>& TaskQueueImpl::task_runner()
307     const {
308   return default_task_runner_;
309 }
310 
UnregisterTaskQueue()311 void TaskQueueImpl::UnregisterTaskQueue() {
312   TRACE_EVENT0("base", "TaskQueueImpl::UnregisterTaskQueue");
313   // Invalidate weak pointers now so no voters reference this in a partially
314   // torn down state.
315   voter_weak_ptr_factory_.InvalidateWeakPtrs();
316   // Detach task runners.
317   {
318     ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
319     task_poster_->ShutdownAndWaitForZeroOperations();
320   }
321 
322   TaskDeque immediate_incoming_queue;
323   base::flat_map<raw_ptr<OnTaskPostedCallbackHandleImpl>, OnTaskPostedHandler>
324       on_task_posted_handlers;
325 
326   {
327     base::internal::CheckedAutoLock lock(any_thread_lock_);
328     any_thread_.unregistered = true;
329     immediate_incoming_queue.swap(any_thread_.immediate_incoming_queue);
330 
331     for (auto& handler : any_thread_.on_task_posted_handlers)
332       handler.first->UnregisterTaskQueue();
333     any_thread_.on_task_posted_handlers.swap(on_task_posted_handlers);
334   }
335 
336   if (main_thread_only().wake_up_queue) {
337     main_thread_only().wake_up_queue->UnregisterQueue(this);
338   }
339 
340   main_thread_only().on_task_started_handler = OnTaskStartedHandler();
341   main_thread_only().on_task_completed_handler = OnTaskCompletedHandler();
342   main_thread_only().wake_up_queue = nullptr;
343   main_thread_only().throttler = nullptr;
344   empty_queues_to_reload_handle_.ReleaseAtomicFlag();
345 
346   // It is possible for a task to hold a scoped_refptr to this, which
347   // will lead to TaskQueueImpl destructor being called when deleting a task.
348   // To avoid use-after-free, we need to clear all fields of a task queue
349   // before starting to delete the tasks.
350   // All work queues and priority queues containing tasks should be moved to
351   // local variables on stack (std::move for unique_ptrs and swap for queues)
352   // before clearing them and deleting tasks.
353 
354   // Flush the queues outside of the lock because TSAN complains about a lock
355   // order inversion for tasks that are posted from within a lock, with a
356   // destructor that acquires the same lock.
357 
358   DelayedIncomingQueue delayed_incoming_queue;
359   delayed_incoming_queue.swap(&main_thread_only().delayed_incoming_queue);
360   std::unique_ptr<WorkQueue> immediate_work_queue =
361       std::move(main_thread_only().immediate_work_queue);
362   std::unique_ptr<WorkQueue> delayed_work_queue =
363       std::move(main_thread_only().delayed_work_queue);
364 }
365 
GetName() const366 const char* TaskQueueImpl::GetName() const {
367   return perfetto::protos::pbzero::SequenceManagerTask::QueueName_Name(name_);
368 }
369 
GetProtoName() const370 QueueName TaskQueueImpl::GetProtoName() const {
371   return name_;
372 }
373 
PostTask(PostedTask task)374 void TaskQueueImpl::PostTask(PostedTask task) {
375   CurrentThread current_thread =
376       associated_thread_->IsBoundToCurrentThread()
377           ? TaskQueueImpl::CurrentThread::kMainThread
378           : TaskQueueImpl::CurrentThread::kNotMainThread;
379 
380 #if DCHECK_IS_ON()
381   TimeDelta delay = GetTaskDelayAdjustment(current_thread);
382   if (absl::holds_alternative<base::TimeTicks>(
383           task.delay_or_delayed_run_time)) {
384     absl::get<base::TimeTicks>(task.delay_or_delayed_run_time) += delay;
385   } else {
386     absl::get<base::TimeDelta>(task.delay_or_delayed_run_time) += delay;
387   }
388 #endif  // DCHECK_IS_ON()
389 
390   if (!task.is_delayed()) {
391     PostImmediateTaskImpl(std::move(task), current_thread);
392   } else {
393     PostDelayedTaskImpl(std::move(task), current_thread);
394   }
395 }
396 
RemoveCancelableTask(HeapHandle heap_handle)397 void TaskQueueImpl::RemoveCancelableTask(HeapHandle heap_handle) {
398   // Can only cancel from the current thread.
399   DCHECK(associated_thread_->IsBoundToCurrentThread());
400   DCHECK(heap_handle.IsValid());
401 
402   main_thread_only().delayed_incoming_queue.remove(heap_handle);
403 
404   // Only update the delayed wake up if the top task is removed.
405   if (heap_handle.index() == 0u) {
406     LazyNow lazy_now(sequence_manager_->main_thread_clock());
407     UpdateWakeUp(&lazy_now);
408   }
409 }
410 
GetTaskDelayAdjustment(CurrentThread current_thread)411 TimeDelta TaskQueueImpl::GetTaskDelayAdjustment(CurrentThread current_thread) {
412 #if DCHECK_IS_ON()
413   if (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread) {
414     base::internal::CheckedAutoLock lock(any_thread_lock_);
415     // Add a per-priority delay to cross thread tasks. This can help diagnose
416     // scheduler induced flakiness by making things flake most of the time.
417     return sequence_manager_->settings()
418         .priority_settings
419         .per_priority_cross_thread_task_delay()[any_thread_.queue_set_index];
420   } else {
421     return sequence_manager_->settings()
422         .priority_settings.per_priority_same_thread_task_delay()
423             [main_thread_only().immediate_work_queue->work_queue_set_index()];
424   }
425 #else
426   // No delay adjustment.
427   return TimeDelta();
428 #endif  // DCHECK_IS_ON()
429 }
430 
PostImmediateTaskImpl(PostedTask task,CurrentThread current_thread)431 void TaskQueueImpl::PostImmediateTaskImpl(PostedTask task,
432                                           CurrentThread current_thread) {
433   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
434   // for details.
435   CHECK(task.callback);
436 
437   bool should_schedule_work = false;
438   {
439     // TODO(alexclarke): Maybe add a main thread only immediate_incoming_queue
440     // See https://crbug.com/901800
441     base::internal::CheckedAutoLock lock(any_thread_lock_);
442     bool add_queue_time_to_tasks = sequence_manager_->GetAddQueueTimeToTasks();
443     TimeTicks queue_time;
444     if (add_queue_time_to_tasks || delayed_fence_allowed_)
445       queue_time = sequence_manager_->any_thread_clock()->NowTicks();
446 
447     // The sequence number must be incremented atomically with pushing onto the
448     // incoming queue. Otherwise if there are several threads posting task we
449     // risk breaking the assumption that sequence numbers increase monotonically
450     // within a queue.
451     EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
452     bool was_immediate_incoming_queue_empty =
453         any_thread_.immediate_incoming_queue.empty();
454     any_thread_.immediate_incoming_queue.push_back(
455         Task(std::move(task), sequence_number, sequence_number, queue_time));
456 
457 #if DCHECK_IS_ON()
458     any_thread_.immediate_incoming_queue.back().cross_thread_ =
459         (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread);
460 #endif
461 
462     sequence_manager_->WillQueueTask(
463         &any_thread_.immediate_incoming_queue.back());
464     MaybeReportIpcTaskQueuedFromAnyThreadLocked(
465         any_thread_.immediate_incoming_queue.back());
466 
467     for (auto& handler : any_thread_.on_task_posted_handlers) {
468       DCHECK(!handler.second.is_null());
469       handler.second.Run(any_thread_.immediate_incoming_queue.back());
470     }
471 
472     // If this queue was completely empty, then the SequenceManager needs to be
473     // informed so it can reload the work queue and add us to the
474     // TaskQueueSelector which can only be done from the main thread. In
475     // addition it may need to schedule a DoWork if this queue isn't blocked.
476     if (was_immediate_incoming_queue_empty &&
477         any_thread_.immediate_work_queue_empty) {
478       sequence_manager_->WillRequestReloadImmediateWorkQueue();
479       empty_queues_to_reload_handle_.SetActive(true);
480       should_schedule_work =
481           any_thread_.post_immediate_task_should_schedule_work;
482     }
483   }
484 
485   // On windows it's important to call this outside of a lock because calling a
486   // pump while holding a lock can result in priority inversions. See
487   // http://shortn/_ntnKNqjDQT for a discussion.
488   //
489   // Calling ScheduleWork outside the lock should be safe, only the main thread
490   // can mutate |any_thread_.post_immediate_task_should_schedule_work|. If it
491   // transitions to false we call ScheduleWork redundantly that's harmless. If
492   // it transitions to true, the side effect of
493   // |empty_queues_to_reload_handle_SetActive(true)| is guaranteed to be picked
494   // up by the ThreadController's call to SequenceManagerImpl::DelayTillNextTask
495   // when it computes what continuation (if any) is needed.
496   if (should_schedule_work)
497     sequence_manager_->ScheduleWork();
498 
499   TraceQueueSize();
500 }
501 
PostDelayedTaskImpl(PostedTask posted_task,CurrentThread current_thread)502 void TaskQueueImpl::PostDelayedTaskImpl(PostedTask posted_task,
503                                         CurrentThread current_thread) {
504   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
505   // for details.
506   CHECK(posted_task.callback);
507 
508   if (current_thread == CurrentThread::kMainThread) {
509     LazyNow lazy_now(sequence_manager_->main_thread_clock());
510     Task pending_task = MakeDelayedTask(std::move(posted_task), &lazy_now);
511     sequence_manager_->MaybeAddLeewayToTask(pending_task);
512     PushOntoDelayedIncomingQueueFromMainThread(
513         std::move(pending_task), &lazy_now,
514         /* notify_task_annotator */ true);
515   } else {
516     LazyNow lazy_now(sequence_manager_->any_thread_clock());
517     PushOntoDelayedIncomingQueue(
518         MakeDelayedTask(std::move(posted_task), &lazy_now));
519   }
520 }
521 
PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,LazyNow * lazy_now,bool notify_task_annotator)522 void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
523     Task pending_task,
524     LazyNow* lazy_now,
525     bool notify_task_annotator) {
526 #if DCHECK_IS_ON()
527   pending_task.cross_thread_ = false;
528 #endif
529 
530   if (notify_task_annotator) {
531     sequence_manager_->WillQueueTask(&pending_task);
532     MaybeReportIpcTaskQueuedFromMainThread(pending_task);
533   }
534   main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
535   UpdateWakeUp(lazy_now);
536 
537   TraceQueueSize();
538 }
539 
PushOntoDelayedIncomingQueue(Task pending_task)540 void TaskQueueImpl::PushOntoDelayedIncomingQueue(Task pending_task) {
541   sequence_manager_->WillQueueTask(&pending_task);
542   MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(pending_task);
543 
544 #if DCHECK_IS_ON()
545   pending_task.cross_thread_ = true;
546 #endif
547 
548   // TODO(altimin): Add a copy method to Task to capture metadata here.
549   auto task_runner = pending_task.task_runner;
550   const auto task_type = pending_task.task_type;
551   PostImmediateTaskImpl(
552       PostedTask(std::move(task_runner),
553                  BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
554                           Unretained(this), std::move(pending_task)),
555                  FROM_HERE, TimeDelta(), Nestable::kNonNestable, task_type),
556       CurrentThread::kNotMainThread);
557 }
558 
ScheduleDelayedWorkTask(Task pending_task)559 void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) {
560   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
561   sequence_manager_->MaybeAddLeewayToTask(pending_task);
562   TimeTicks now = sequence_manager_->main_thread_clock()->NowTicks();
563   LazyNow lazy_now(now);
564   // A delayed task is ready to run as soon as earliest_delayed_run_time() is
565   // reached.
566   if (pending_task.earliest_delayed_run_time() <= now) {
567     // If |delayed_run_time| is in the past then push it onto the work queue
568     // immediately. To ensure the right task ordering we need to temporarily
569     // push it onto the |delayed_incoming_queue|.
570     pending_task.delayed_run_time = now;
571     main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
572     MoveReadyDelayedTasksToWorkQueue(
573         &lazy_now, sequence_manager_->GetNextSequenceNumber());
574   } else {
575     // If |delayed_run_time| is in the future we can queue it as normal.
576     PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task),
577                                                &lazy_now, false);
578   }
579   TraceQueueSize();
580 }
581 
ReloadEmptyImmediateWorkQueue()582 void TaskQueueImpl::ReloadEmptyImmediateWorkQueue() {
583   DCHECK(main_thread_only().immediate_work_queue->Empty());
584   main_thread_only().immediate_work_queue->TakeImmediateIncomingQueueTasks();
585 
586   if (main_thread_only().throttler && IsQueueEnabled()) {
587     main_thread_only().throttler->OnHasImmediateTask();
588   }
589 }
590 
TakeImmediateIncomingQueueTasks(TaskDeque * queue)591 void TaskQueueImpl::TakeImmediateIncomingQueueTasks(TaskDeque* queue) {
592   DCHECK(queue->empty());
593   // Now is a good time to consider reducing the empty queue's capacity if we're
594   // wasting memory, before we make it the `immediate_incoming_queue`.
595   queue->MaybeShrinkQueue();
596 
597   base::internal::CheckedAutoLock lock(any_thread_lock_);
598   queue->swap(any_thread_.immediate_incoming_queue);
599 
600   // Activate delayed fence if necessary. This is ideologically similar to
601   // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted
602   // from any thread we can't generate an enqueue order for the fence there,
603   // so we have to check all immediate tasks and use their enqueue order for
604   // a fence.
605   if (main_thread_only().delayed_fence) {
606     for (const Task& task : *queue) {
607       DCHECK(!task.queue_time.is_null());
608       DCHECK(task.delayed_run_time.is_null());
609       if (task.queue_time >= main_thread_only().delayed_fence.value()) {
610         main_thread_only().delayed_fence = std::nullopt;
611         DCHECK(!main_thread_only().current_fence);
612         main_thread_only().current_fence = Fence(task.task_order());
613         // Do not trigger WorkQueueSets notification when taking incoming
614         // immediate queue.
615         main_thread_only().immediate_work_queue->InsertFenceSilently(
616             *main_thread_only().current_fence);
617         main_thread_only().delayed_work_queue->InsertFenceSilently(
618             *main_thread_only().current_fence);
619         break;
620       }
621     }
622   }
623 
624   UpdateCrossThreadQueueStateLocked();
625 }
626 
IsEmpty() const627 bool TaskQueueImpl::IsEmpty() const {
628   if (!main_thread_only().delayed_work_queue->Empty() ||
629       !main_thread_only().delayed_incoming_queue.empty() ||
630       !main_thread_only().immediate_work_queue->Empty()) {
631     return false;
632   }
633 
634   base::internal::CheckedAutoLock lock(any_thread_lock_);
635   return any_thread_.immediate_incoming_queue.empty();
636 }
637 
GetNumberOfPendingTasks() const638 size_t TaskQueueImpl::GetNumberOfPendingTasks() const {
639   size_t task_count = 0;
640   task_count += main_thread_only().delayed_work_queue->Size();
641   task_count += main_thread_only().delayed_incoming_queue.size();
642   task_count += main_thread_only().immediate_work_queue->Size();
643 
644   base::internal::CheckedAutoLock lock(any_thread_lock_);
645   task_count += any_thread_.immediate_incoming_queue.size();
646   return task_count;
647 }
648 
HasTaskToRunImmediatelyOrReadyDelayedTask() const649 bool TaskQueueImpl::HasTaskToRunImmediatelyOrReadyDelayedTask() const {
650   // Any work queue tasks count as immediate work.
651   if (!main_thread_only().delayed_work_queue->Empty() ||
652       !main_thread_only().immediate_work_queue->Empty()) {
653     return true;
654   }
655 
656   // Tasks on |delayed_incoming_queue| that could run now, count as
657   // immediate work.
658   if (!main_thread_only().delayed_incoming_queue.empty() &&
659       main_thread_only().delayed_incoming_queue.top().delayed_run_time <=
660           sequence_manager_->main_thread_clock()->NowTicks()) {
661     return true;
662   }
663 
664   // Finally tasks on |immediate_incoming_queue| count as immediate work.
665   base::internal::CheckedAutoLock lock(any_thread_lock_);
666   return !any_thread_.immediate_incoming_queue.empty();
667 }
668 
GetNextDesiredWakeUp()669 std::optional<WakeUp> TaskQueueImpl::GetNextDesiredWakeUp() {
670   // Note we don't scheduled a wake-up for disabled queues.
671   if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled())
672     return std::nullopt;
673 
674   const auto& top_task = main_thread_only().delayed_incoming_queue.top();
675 
676   // High resolution is needed if the queue contains high resolution tasks and
677   // has a priority index <= kNormalPriority (precise execution time is
678   // unnecessary for a low priority queue).
679   WakeUpResolution resolution = has_pending_high_resolution_tasks() &&
680                                         GetQueuePriority() <= DefaultPriority()
681                                     ? WakeUpResolution::kHigh
682                                     : WakeUpResolution::kLow;
683   subtle::DelayPolicy delay_policy = top_task.delay_policy;
684   if (GetQueuePriority() > DefaultPriority() &&
685       delay_policy == subtle::DelayPolicy::kPrecise) {
686     delay_policy = subtle::DelayPolicy::kFlexibleNoSooner;
687   }
688   return WakeUp{top_task.delayed_run_time, top_task.leeway, resolution,
689                 delay_policy};
690 }
691 
OnWakeUp(LazyNow * lazy_now,EnqueueOrder enqueue_order)692 void TaskQueueImpl::OnWakeUp(LazyNow* lazy_now, EnqueueOrder enqueue_order) {
693   MoveReadyDelayedTasksToWorkQueue(lazy_now, enqueue_order);
694   if (main_thread_only().throttler) {
695     main_thread_only().throttler->OnWakeUp(lazy_now);
696   }
697 }
698 
RemoveAllCanceledDelayedTasksFromFront(LazyNow * lazy_now)699 bool TaskQueueImpl::RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now) {
700   // Because task destructors could have a side-effect of posting new tasks, we
701   // move all the cancelled tasks into a temporary container before deleting
702   // them. This is to avoid the queue from changing while iterating over it.
703   absl::InlinedVector<Task, 8> tasks_to_delete;
704 
705   while (!main_thread_only().delayed_incoming_queue.empty()) {
706     const Task& task = main_thread_only().delayed_incoming_queue.top();
707     CHECK(task.task);
708     if (!task.task.IsCancelled())
709       break;
710 
711     tasks_to_delete.push_back(
712         main_thread_only().delayed_incoming_queue.take_top());
713   }
714 
715   if (!tasks_to_delete.empty()) {
716     UpdateWakeUp(lazy_now);
717     return true;
718   }
719 
720   return false;
721 }
722 
MoveReadyDelayedTasksToWorkQueue(LazyNow * lazy_now,EnqueueOrder enqueue_order)723 void TaskQueueImpl::MoveReadyDelayedTasksToWorkQueue(
724     LazyNow* lazy_now,
725     EnqueueOrder enqueue_order) {
726   // Enqueue all delayed tasks that should be running now, skipping any that
727   // have been canceled.
728   WorkQueue::TaskPusher delayed_work_queue_task_pusher(
729       main_thread_only().delayed_work_queue->CreateTaskPusher());
730 
731   // Because task destructors could have a side-effect of posting new tasks, we
732   // move all the cancelled tasks into a temporary container before deleting
733   // them. This is to avoid the queue from changing while iterating over it.
734   absl::InlinedVector<Task, 8> tasks_to_delete;
735 
736   while (!main_thread_only().delayed_incoming_queue.empty()) {
737     const Task& task = main_thread_only().delayed_incoming_queue.top();
738     CHECK(task.task);
739 
740     // Leave the top task alone if it hasn't been canceled and it is not ready.
741     const bool is_cancelled = task.task.IsCancelled();
742     if (!is_cancelled && task.earliest_delayed_run_time() > lazy_now->Now())
743       break;
744 
745     Task ready_task = main_thread_only().delayed_incoming_queue.take_top();
746     if (is_cancelled) {
747       tasks_to_delete.push_back(std::move(ready_task));
748       continue;
749     }
750 
751     // The top task is ready to run. Move it to the delayed work queue.
752 #if DCHECK_IS_ON()
753     if (sequence_manager_->settings().log_task_delay_expiry)
754       VLOG(0) << GetName() << " Delay expired for "
755               << ready_task.posted_from.ToString();
756 #endif  // DCHECK_IS_ON()
757     DCHECK(!ready_task.delayed_run_time.is_null());
758     DCHECK(!ready_task.enqueue_order_set());
759     ready_task.set_enqueue_order(enqueue_order);
760     ActivateDelayedFenceIfNeeded(ready_task);
761 
762     delayed_work_queue_task_pusher.Push(std::move(ready_task));
763   }
764 
765   // Explicitly delete tasks last.
766   tasks_to_delete.clear();
767 
768   UpdateWakeUp(lazy_now);
769 }
770 
TraceQueueSize() const771 void TaskQueueImpl::TraceQueueSize() const {
772   bool is_tracing;
773   TRACE_EVENT_CATEGORY_GROUP_ENABLED(
774       TRACE_DISABLED_BY_DEFAULT("sequence_manager"), &is_tracing);
775   if (!is_tracing)
776     return;
777 
778   // It's only safe to access the work queues from the main thread.
779   // TODO(alexclarke): We should find another way of tracing this
780   if (!associated_thread_->IsBoundToCurrentThread())
781     return;
782 
783   size_t total_task_count;
784   {
785     base::internal::CheckedAutoLock lock(any_thread_lock_);
786     total_task_count = any_thread_.immediate_incoming_queue.size() +
787                        main_thread_only().immediate_work_queue->Size() +
788                        main_thread_only().delayed_work_queue->Size() +
789                        main_thread_only().delayed_incoming_queue.size();
790   }
791   TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), GetName(),
792                  total_task_count);
793 }
794 
SetQueuePriority(TaskQueue::QueuePriority priority)795 void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) {
796   const TaskQueue::QueuePriority previous_priority = GetQueuePriority();
797   if (priority == previous_priority)
798     return;
799   sequence_manager_->main_thread_only().selector.SetQueuePriority(this,
800                                                                   priority);
801 
802 #if BUILDFLAG(IS_WIN)
803   // Updating queue priority can change whether high resolution timer is needed.
804   LazyNow lazy_now(sequence_manager_->main_thread_clock());
805   UpdateWakeUp(&lazy_now);
806 #endif
807 
808   if (priority > DefaultPriority()) {
809     // |priority| is now lower than the default, so update accordingly.
810     main_thread_only()
811         .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
812         EnqueueOrder::max();
813   } else if (previous_priority > DefaultPriority()) {
814     // |priority| is no longer lower than the default, so record current
815     // sequence number.
816     DCHECK_EQ(
817         main_thread_only()
818             .enqueue_order_at_which_we_became_unblocked_with_normal_priority,
819         EnqueueOrder::max());
820     main_thread_only()
821         .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
822         sequence_manager_->GetNextSequenceNumber();
823   }
824 }
825 
GetQueuePriority() const826 TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const {
827   size_t set_index = immediate_work_queue()->work_queue_set_index();
828   DCHECK_EQ(set_index, delayed_work_queue()->work_queue_set_index());
829   return static_cast<TaskQueue::QueuePriority>(set_index);
830 }
831 
AsValue(TimeTicks now,bool force_verbose) const832 Value::Dict TaskQueueImpl::AsValue(TimeTicks now, bool force_verbose) const {
833   base::internal::CheckedAutoLock lock(any_thread_lock_);
834   Value::Dict state;
835   state.Set("name", GetName());
836   if (any_thread_.unregistered) {
837     state.Set("unregistered", true);
838     return state;
839   }
840   DCHECK(main_thread_only().delayed_work_queue);
841   DCHECK(main_thread_only().immediate_work_queue);
842 
843   state.Set("task_queue_id",
844             StringPrintf("0x%" PRIx64, static_cast<uint64_t>(
845                                            reinterpret_cast<uintptr_t>(this))));
846   state.Set("enabled", IsQueueEnabled());
847   // TODO(crbug.com/1334256): Make base::Value able to store an int64_t and
848   // remove the various static_casts below.
849   state.Set("any_thread_.immediate_incoming_queuesize",
850             static_cast<int>(any_thread_.immediate_incoming_queue.size()));
851   state.Set("delayed_incoming_queue_size",
852             static_cast<int>(main_thread_only().delayed_incoming_queue.size()));
853   state.Set("immediate_work_queue_size",
854             static_cast<int>(main_thread_only().immediate_work_queue->Size()));
855   state.Set("delayed_work_queue_size",
856             static_cast<int>(main_thread_only().delayed_work_queue->Size()));
857 
858   state.Set("any_thread_.immediate_incoming_queuecapacity",
859             static_cast<int>(any_thread_.immediate_incoming_queue.capacity()));
860   state.Set("immediate_work_queue_capacity",
861             static_cast<int>(immediate_work_queue()->Capacity()));
862   state.Set("delayed_work_queue_capacity",
863             static_cast<int>(delayed_work_queue()->Capacity()));
864 
865   if (!main_thread_only().delayed_incoming_queue.empty()) {
866     TimeDelta delay_to_next_task =
867         (main_thread_only().delayed_incoming_queue.top().delayed_run_time -
868          sequence_manager_->main_thread_clock()->NowTicks());
869     state.Set("delay_to_next_task_ms", delay_to_next_task.InMillisecondsF());
870   }
871   if (main_thread_only().current_fence) {
872     Value::Dict fence_state;
873     fence_state.Set(
874         "enqueue_order",
875         static_cast<int>(
876             main_thread_only().current_fence->task_order().enqueue_order()));
877     fence_state.Set("activated_in_wake_up", !main_thread_only()
878                                                  .current_fence->task_order()
879                                                  .delayed_run_time()
880                                                  .is_null());
881     state.Set("current_fence", std::move(fence_state));
882   }
883   if (main_thread_only().delayed_fence) {
884     state.Set("delayed_fence_seconds_from_now",
885               (main_thread_only().delayed_fence.value() - now).InSecondsF());
886   }
887 
888   bool verbose = false;
889   TRACE_EVENT_CATEGORY_GROUP_ENABLED(
890       TRACE_DISABLED_BY_DEFAULT("sequence_manager.verbose_snapshots"),
891       &verbose);
892 
893   if (verbose || force_verbose) {
894     state.Set("immediate_incoming_queue",
895               QueueAsValue(any_thread_.immediate_incoming_queue, now));
896     state.Set("delayed_work_queue",
897               main_thread_only().delayed_work_queue->AsValue(now));
898     state.Set("immediate_work_queue",
899               main_thread_only().immediate_work_queue->AsValue(now));
900     state.Set("delayed_incoming_queue",
901               main_thread_only().delayed_incoming_queue.AsValue(now));
902   }
903   state.Set("priority", GetQueuePriority());
904   return state;
905 }
906 
AddTaskObserver(TaskObserver * task_observer)907 void TaskQueueImpl::AddTaskObserver(TaskObserver* task_observer) {
908   main_thread_only().task_observers.AddObserver(task_observer);
909 }
910 
RemoveTaskObserver(TaskObserver * task_observer)911 void TaskQueueImpl::RemoveTaskObserver(TaskObserver* task_observer) {
912   main_thread_only().task_observers.RemoveObserver(task_observer);
913 }
914 
NotifyWillProcessTask(const Task & task,bool was_blocked_or_low_priority)915 void TaskQueueImpl::NotifyWillProcessTask(const Task& task,
916                                           bool was_blocked_or_low_priority) {
917   DCHECK(should_notify_observers_);
918 
919   for (auto& observer : main_thread_only().task_observers)
920     observer.WillProcessTask(task, was_blocked_or_low_priority);
921 }
922 
NotifyDidProcessTask(const Task & task)923 void TaskQueueImpl::NotifyDidProcessTask(const Task& task) {
924   DCHECK(should_notify_observers_);
925   for (auto& observer : main_thread_only().task_observers)
926     observer.DidProcessTask(task);
927 }
928 
InsertFence(TaskQueue::InsertFencePosition position)929 void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) {
930   Fence new_fence = position == TaskQueue::InsertFencePosition::kNow
931                         ? Fence::CreateWithEnqueueOrder(
932                               sequence_manager_->GetNextSequenceNumber())
933                         : Fence::BlockingFence();
934   InsertFence(new_fence);
935 }
936 
InsertFence(Fence current_fence)937 void TaskQueueImpl::InsertFence(Fence current_fence) {
938   // Only one fence may be present at a time.
939   main_thread_only().delayed_fence = std::nullopt;
940 
941   std::optional<Fence> previous_fence = main_thread_only().current_fence;
942 
943   // Tasks posted after this point will have a strictly higher enqueue order
944   // and will be blocked from running.
945   main_thread_only().current_fence = current_fence;
946   bool front_task_unblocked =
947       main_thread_only().immediate_work_queue->InsertFence(current_fence);
948   front_task_unblocked |=
949       main_thread_only().delayed_work_queue->InsertFence(current_fence);
950 
951   {
952     base::internal::CheckedAutoLock lock(any_thread_lock_);
953     if (!front_task_unblocked && previous_fence &&
954         previous_fence->task_order() < current_fence.task_order()) {
955       if (!any_thread_.immediate_incoming_queue.empty() &&
956           any_thread_.immediate_incoming_queue.front().task_order() >
957               previous_fence->task_order() &&
958           any_thread_.immediate_incoming_queue.front().task_order() <
959               current_fence.task_order()) {
960         front_task_unblocked = true;
961       }
962     }
963 
964     UpdateCrossThreadQueueStateLocked();
965   }
966 
967   if (IsQueueEnabled() && front_task_unblocked) {
968     OnQueueUnblocked();
969     sequence_manager_->ScheduleWork();
970   }
971 }
972 
InsertFenceAt(TimeTicks time)973 void TaskQueueImpl::InsertFenceAt(TimeTicks time) {
974   DCHECK(delayed_fence_allowed_)
975       << "Delayed fences are not supported for this queue. Enable them "
976          "explicitly in TaskQueue::Spec when creating the queue";
977 
978   // Task queue can have only one fence, delayed or not.
979   RemoveFence();
980   main_thread_only().delayed_fence = time;
981 }
982 
RemoveFence()983 void TaskQueueImpl::RemoveFence() {
984   std::optional<Fence> previous_fence = main_thread_only().current_fence;
985   main_thread_only().current_fence = std::nullopt;
986   main_thread_only().delayed_fence = std::nullopt;
987 
988   bool front_task_unblocked =
989       main_thread_only().immediate_work_queue->RemoveFence();
990   front_task_unblocked |= main_thread_only().delayed_work_queue->RemoveFence();
991 
992   {
993     base::internal::CheckedAutoLock lock(any_thread_lock_);
994     if (!front_task_unblocked && previous_fence) {
995       if (!any_thread_.immediate_incoming_queue.empty() &&
996           any_thread_.immediate_incoming_queue.front().task_order() >
997               previous_fence->task_order()) {
998         front_task_unblocked = true;
999       }
1000     }
1001 
1002     UpdateCrossThreadQueueStateLocked();
1003   }
1004 
1005   if (IsQueueEnabled() && front_task_unblocked) {
1006     OnQueueUnblocked();
1007     sequence_manager_->ScheduleWork();
1008   }
1009 }
1010 
BlockedByFence() const1011 bool TaskQueueImpl::BlockedByFence() const {
1012   if (!main_thread_only().current_fence)
1013     return false;
1014 
1015   if (!main_thread_only().immediate_work_queue->BlockedByFence() ||
1016       !main_thread_only().delayed_work_queue->BlockedByFence()) {
1017     return false;
1018   }
1019 
1020   base::internal::CheckedAutoLock lock(any_thread_lock_);
1021   if (any_thread_.immediate_incoming_queue.empty())
1022     return true;
1023 
1024   return any_thread_.immediate_incoming_queue.front().task_order() >
1025          main_thread_only().current_fence->task_order();
1026 }
1027 
HasActiveFence()1028 bool TaskQueueImpl::HasActiveFence() {
1029   if (main_thread_only().delayed_fence &&
1030       sequence_manager_->main_thread_clock()->NowTicks() >
1031           main_thread_only().delayed_fence.value()) {
1032     return true;
1033   }
1034   return !!main_thread_only().current_fence;
1035 }
1036 
CouldTaskRun(EnqueueOrder enqueue_order) const1037 bool TaskQueueImpl::CouldTaskRun(EnqueueOrder enqueue_order) const {
1038   if (!IsQueueEnabled())
1039     return false;
1040 
1041   if (!main_thread_only().current_fence)
1042     return true;
1043 
1044   // TODO(crbug.com/1249857): This should use TaskOrder. This is currently only
1045   // used for tests and is fine as-is, but we should be using `TaskOrder` for
1046   // task comparisons. Also this test should be renamed with a testing suffix as
1047   // it is not used in production.
1048   return enqueue_order <
1049          main_thread_only().current_fence->task_order().enqueue_order();
1050 }
1051 
WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const1052 bool TaskQueueImpl::WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const {
1053   return enqueue_order <
1054          main_thread_only()
1055              .enqueue_order_at_which_we_became_unblocked_with_normal_priority;
1056 }
1057 
1058 // static
QueueAsValue(const TaskDeque & queue,TimeTicks now)1059 Value::List TaskQueueImpl::QueueAsValue(const TaskDeque& queue, TimeTicks now) {
1060   Value::List state;
1061   for (const Task& task : queue)
1062     state.Append(TaskAsValue(task, now));
1063   return state;
1064 }
1065 
1066 // static
TaskAsValue(const Task & task,TimeTicks now)1067 Value::Dict TaskQueueImpl::TaskAsValue(const Task& task, TimeTicks now) {
1068   Value::Dict state;
1069   state.Set("posted_from", task.posted_from.ToString());
1070   if (task.enqueue_order_set())
1071     state.Set("enqueue_order", static_cast<int>(task.enqueue_order()));
1072   state.Set("sequence_num", task.sequence_num);
1073   state.Set("nestable", task.nestable == Nestable::kNestable);
1074   state.Set("is_high_res", task.is_high_res);
1075   state.Set("is_cancelled", task.task.IsCancelled());
1076   state.Set("delayed_run_time",
1077             (task.delayed_run_time - TimeTicks()).InMillisecondsF());
1078   const TimeDelta delayed_run_time_milliseconds_from_now =
1079       task.delayed_run_time.is_null() ? TimeDelta()
1080                                       : (task.delayed_run_time - now);
1081   state.Set("delayed_run_time_milliseconds_from_now",
1082             delayed_run_time_milliseconds_from_now.InMillisecondsF());
1083   return state;
1084 }
1085 
MakeDelayedTask(PostedTask delayed_task,LazyNow * lazy_now) const1086 Task TaskQueueImpl::MakeDelayedTask(PostedTask delayed_task,
1087                                     LazyNow* lazy_now) const {
1088   EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
1089   base::TimeDelta delay;
1090   WakeUpResolution resolution = WakeUpResolution::kLow;
1091 #if BUILDFLAG(IS_WIN)
1092   const bool explicit_high_resolution_timer_win =
1093       g_explicit_high_resolution_timer_win.load(std::memory_order_relaxed);
1094 #endif  // BUILDFLAG(IS_WIN)
1095   if (absl::holds_alternative<base::TimeDelta>(
1096           delayed_task.delay_or_delayed_run_time)) {
1097     delay = absl::get<base::TimeDelta>(delayed_task.delay_or_delayed_run_time);
1098     delayed_task.delay_or_delayed_run_time = lazy_now->Now() + delay;
1099   } else {
1100     delay = absl::get<base::TimeTicks>(delayed_task.delay_or_delayed_run_time) -
1101             lazy_now->Now();
1102   }
1103 #if BUILDFLAG(IS_WIN)
1104   if (!explicit_high_resolution_timer_win &&
1105       delay < (2 * base::Milliseconds(Time::kMinLowResolutionThresholdMs))) {
1106     // Outside the kExplicitHighResolutionTimerWin experiment, We consider the
1107     // task needs a high resolution timer if the delay is more than 0 and less
1108     // than 32ms. This caps the relative error to less than 50% : a 33ms wait
1109     // can wake at 48ms since the default resolution on Windows is between 10
1110     // and 15ms.
1111     resolution = WakeUpResolution::kHigh;
1112   }
1113 #endif  // BUILDFLAG(IS_WIN)
1114   delayed_task.delay_policy = subtle::MaybeOverrideDelayPolicy(
1115       delayed_task.delay_policy, delay,
1116       g_max_precise_delay.load(std::memory_order_relaxed));
1117   // leeway isn't specified yet since this may be called from any thread.
1118   return Task(std::move(delayed_task), sequence_number, EnqueueOrder(),
1119               lazy_now->Now(), resolution);
1120 }
1121 
IsQueueEnabled() const1122 bool TaskQueueImpl::IsQueueEnabled() const {
1123   return main_thread_only().is_enabled;
1124 }
1125 
SetQueueEnabled(bool enabled)1126 void TaskQueueImpl::SetQueueEnabled(bool enabled) {
1127   if (main_thread_only().is_enabled == enabled)
1128     return;
1129 
1130   // Update the |main_thread_only_| struct.
1131   main_thread_only().is_enabled = enabled;
1132   main_thread_only().disabled_time = std::nullopt;
1133 
1134   // |sequence_manager_| can be null in tests.
1135   if (!sequence_manager_)
1136     return;
1137 
1138   LazyNow lazy_now(sequence_manager_->main_thread_clock());
1139 
1140   if (!enabled) {
1141     bool tracing_enabled = false;
1142     TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1143                                        &tracing_enabled);
1144     main_thread_only().disabled_time = lazy_now.Now();
1145   } else {
1146     // Override reporting if the queue is becoming enabled again.
1147     main_thread_only().should_report_posted_tasks_when_disabled = false;
1148   }
1149 
1150   // If there is a throttler, it will be notified of pending delayed and
1151   // immediate tasks inside UpdateWakeUp().
1152   UpdateWakeUp(&lazy_now);
1153 
1154   {
1155     base::internal::CheckedAutoLock lock(any_thread_lock_);
1156     UpdateCrossThreadQueueStateLocked();
1157 
1158     // Copy over the task-reporting related state.
1159     any_thread_.is_enabled = enabled;
1160     any_thread_.tracing_only.disabled_time = main_thread_only().disabled_time;
1161     any_thread_.tracing_only.should_report_posted_tasks_when_disabled =
1162         main_thread_only().should_report_posted_tasks_when_disabled;
1163   }
1164 
1165   // Finally, enable or disable the queue with the selector.
1166   if (enabled) {
1167     // Note the selector calls SequenceManager::OnTaskQueueEnabled which posts
1168     // a DoWork if needed.
1169     sequence_manager_->main_thread_only().selector.EnableQueue(this);
1170 
1171     if (!BlockedByFence())
1172       OnQueueUnblocked();
1173   } else {
1174     sequence_manager_->main_thread_only().selector.DisableQueue(this);
1175   }
1176 }
1177 
SetShouldReportPostedTasksWhenDisabled(bool should_report)1178 void TaskQueueImpl::SetShouldReportPostedTasksWhenDisabled(bool should_report) {
1179   if (main_thread_only().should_report_posted_tasks_when_disabled ==
1180       should_report)
1181     return;
1182 
1183   // Only observe transitions turning the reporting on if tracing is enabled.
1184   if (should_report) {
1185     bool tracing_enabled = false;
1186     TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1187                                        &tracing_enabled);
1188     if (!tracing_enabled)
1189       return;
1190   }
1191 
1192   main_thread_only().should_report_posted_tasks_when_disabled = should_report;
1193 
1194   // Mirror the state to the AnyThread struct as well.
1195   {
1196     base::internal::CheckedAutoLock lock(any_thread_lock_);
1197     any_thread_.tracing_only.should_report_posted_tasks_when_disabled =
1198         should_report;
1199   }
1200 }
1201 
UpdateCrossThreadQueueStateLocked()1202 void TaskQueueImpl::UpdateCrossThreadQueueStateLocked() {
1203   any_thread_.immediate_work_queue_empty =
1204       main_thread_only().immediate_work_queue->Empty();
1205   any_thread_.is_enabled = main_thread_only().is_enabled;
1206 
1207   if (main_thread_only().throttler) {
1208     // If there's a Throttler, always ScheduleWork() when immediate work is
1209     // posted and the queue is enabled, to ensure that
1210     // Throttler::OnHasImmediateTask() is invoked.
1211     any_thread_.post_immediate_task_should_schedule_work = IsQueueEnabled();
1212   } else {
1213     // Otherwise, ScheduleWork() only if the queue is enabled and there isn't a
1214     // fence to prevent the task from being executed.
1215     any_thread_.post_immediate_task_should_schedule_work =
1216         IsQueueEnabled() && !main_thread_only().current_fence;
1217   }
1218 
1219 #if DCHECK_IS_ON()
1220   any_thread_.queue_set_index =
1221       main_thread_only().immediate_work_queue->work_queue_set_index();
1222 #endif
1223 }
1224 
ReclaimMemory(TimeTicks now)1225 void TaskQueueImpl::ReclaimMemory(TimeTicks now) {
1226   if (main_thread_only().delayed_incoming_queue.empty())
1227     return;
1228 
1229   main_thread_only().delayed_incoming_queue.SweepCancelledTasks(
1230       sequence_manager_);
1231 
1232   // If deleting one of the cancelled tasks shut down this queue, bail out.
1233   // Note that in this scenario |this| is still valid, but some fields of the
1234   // queue have been cleared out by |UnregisterTaskQueue|.
1235   if (!main_thread_only().delayed_work_queue) {
1236     return;
1237   }
1238 
1239   LazyNow lazy_now(now);
1240   UpdateWakeUp(&lazy_now);
1241 
1242   // Also consider shrinking the work queue if it's wasting memory.
1243   main_thread_only().delayed_work_queue->MaybeShrinkQueue();
1244   main_thread_only().immediate_work_queue->MaybeShrinkQueue();
1245 
1246   {
1247     base::internal::CheckedAutoLock lock(any_thread_lock_);
1248     any_thread_.immediate_incoming_queue.MaybeShrinkQueue();
1249   }
1250 }
1251 
PushImmediateIncomingTaskForTest(Task task)1252 void TaskQueueImpl::PushImmediateIncomingTaskForTest(Task task) {
1253   base::internal::CheckedAutoLock lock(any_thread_lock_);
1254   any_thread_.immediate_incoming_queue.push_back(std::move(task));
1255 }
1256 
RequeueDeferredNonNestableTask(DeferredNonNestableTask task)1257 void TaskQueueImpl::RequeueDeferredNonNestableTask(
1258     DeferredNonNestableTask task) {
1259   DCHECK(task.task.nestable == Nestable::kNonNestable);
1260 
1261   // It's possible that the queue was unregistered since the task was posted.
1262   // Skip the task in that case.
1263   if (!main_thread_only().delayed_work_queue)
1264     return;
1265 
1266   // The re-queued tasks have to be pushed onto the front because we'd otherwise
1267   // violate the strict monotonically increasing enqueue order within the
1268   // WorkQueue.  We can't assign them a new enqueue order here because that will
1269   // not behave correctly with fences and things will break (e.g Idle TQ).
1270   if (task.work_queue_type == WorkQueueType::kDelayed) {
1271     main_thread_only().delayed_work_queue->PushNonNestableTaskToFront(
1272         std::move(task.task));
1273   } else {
1274     // We're about to push |task| onto an empty |immediate_work_queue|
1275     // (bypassing |immediate_incoming_queue_|). As such, we no longer need to
1276     // reload if we were planning to. The flag must be cleared while holding
1277     // the lock to avoid a cross-thread post task setting it again before
1278     // we actually make |immediate_work_queue| non-empty.
1279     if (main_thread_only().immediate_work_queue->Empty()) {
1280       base::internal::CheckedAutoLock lock(any_thread_lock_);
1281       empty_queues_to_reload_handle_.SetActive(false);
1282 
1283       any_thread_.immediate_work_queue_empty = false;
1284       main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
1285           std::move(task.task));
1286 
1287     } else {
1288       main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
1289           std::move(task.task));
1290     }
1291   }
1292 }
1293 
SetThrottler(TaskQueue::Throttler * throttler)1294 void TaskQueueImpl::SetThrottler(TaskQueue::Throttler* throttler) {
1295   DCHECK(throttler);
1296   DCHECK(!main_thread_only().throttler)
1297       << "Can't assign two different throttlers to "
1298          "base::sequence_manager:TaskQueue";
1299   // `throttler` is guaranteed to outlive this object.
1300   main_thread_only().throttler = throttler;
1301 }
1302 
ResetThrottler()1303 void TaskQueueImpl::ResetThrottler() {
1304   main_thread_only().throttler = nullptr;
1305   LazyNow lazy_now(sequence_manager_->main_thread_clock());
1306   // The current delayed wake up may have been determined by the Throttler.
1307   // Update it now that there is no Throttler.
1308   UpdateWakeUp(&lazy_now);
1309 }
1310 
UpdateWakeUp(LazyNow * lazy_now)1311 void TaskQueueImpl::UpdateWakeUp(LazyNow* lazy_now) {
1312   std::optional<WakeUp> wake_up = GetNextDesiredWakeUp();
1313   if (main_thread_only().throttler && IsQueueEnabled()) {
1314     // GetNextAllowedWakeUp() may return a non-null wake_up even if |wake_up| is
1315     // nullopt, e.g. to throttle immediate tasks.
1316     wake_up = main_thread_only().throttler->GetNextAllowedWakeUp(
1317         lazy_now, wake_up, HasTaskToRunImmediatelyOrReadyDelayedTask());
1318   }
1319   SetNextWakeUp(lazy_now, wake_up);
1320 }
1321 
SetNextWakeUp(LazyNow * lazy_now,std::optional<WakeUp> wake_up)1322 void TaskQueueImpl::SetNextWakeUp(LazyNow* lazy_now,
1323                                   std::optional<WakeUp> wake_up) {
1324   if (main_thread_only().scheduled_wake_up == wake_up)
1325     return;
1326   main_thread_only().scheduled_wake_up = wake_up;
1327   main_thread_only().wake_up_queue->SetNextWakeUpForQueue(this, lazy_now,
1328                                                           wake_up);
1329 }
1330 
HasTaskToRunImmediately() const1331 bool TaskQueueImpl::HasTaskToRunImmediately() const {
1332   // Any work queue tasks count as immediate work.
1333   if (!main_thread_only().delayed_work_queue->Empty() ||
1334       !main_thread_only().immediate_work_queue->Empty()) {
1335     return true;
1336   }
1337 
1338   // Finally tasks on |immediate_incoming_queue| count as immediate work.
1339   base::internal::CheckedAutoLock lock(any_thread_lock_);
1340   return !any_thread_.immediate_incoming_queue.empty();
1341 }
1342 
HasTaskToRunImmediatelyLocked() const1343 bool TaskQueueImpl::HasTaskToRunImmediatelyLocked() const {
1344   return !main_thread_only().delayed_work_queue->Empty() ||
1345          !main_thread_only().immediate_work_queue->Empty() ||
1346          !any_thread_.immediate_incoming_queue.empty();
1347 }
1348 
SetOnTaskStartedHandler(TaskQueueImpl::OnTaskStartedHandler handler)1349 void TaskQueueImpl::SetOnTaskStartedHandler(
1350     TaskQueueImpl::OnTaskStartedHandler handler) {
1351   DCHECK(should_notify_observers_ || handler.is_null());
1352   main_thread_only().on_task_started_handler = std::move(handler);
1353 }
1354 
OnTaskStarted(const Task & task,const TaskQueue::TaskTiming & task_timing)1355 void TaskQueueImpl::OnTaskStarted(const Task& task,
1356                                   const TaskQueue::TaskTiming& task_timing) {
1357   if (!main_thread_only().on_task_started_handler.is_null())
1358     main_thread_only().on_task_started_handler.Run(task, task_timing);
1359 }
1360 
SetOnTaskCompletedHandler(TaskQueueImpl::OnTaskCompletedHandler handler)1361 void TaskQueueImpl::SetOnTaskCompletedHandler(
1362     TaskQueueImpl::OnTaskCompletedHandler handler) {
1363   DCHECK(should_notify_observers_ || handler.is_null());
1364   main_thread_only().on_task_completed_handler = std::move(handler);
1365 }
1366 
OnTaskCompleted(const Task & task,TaskQueue::TaskTiming * task_timing,LazyNow * lazy_now)1367 void TaskQueueImpl::OnTaskCompleted(const Task& task,
1368                                     TaskQueue::TaskTiming* task_timing,
1369                                     LazyNow* lazy_now) {
1370   if (!main_thread_only().on_task_completed_handler.is_null()) {
1371     main_thread_only().on_task_completed_handler.Run(task, task_timing,
1372                                                      lazy_now);
1373   }
1374 }
1375 
RequiresTaskTiming() const1376 bool TaskQueueImpl::RequiresTaskTiming() const {
1377   return !main_thread_only().on_task_started_handler.is_null() ||
1378          !main_thread_only().on_task_completed_handler.is_null();
1379 }
1380 
1381 std::unique_ptr<TaskQueue::OnTaskPostedCallbackHandle>
AddOnTaskPostedHandler(OnTaskPostedHandler handler)1382 TaskQueueImpl::AddOnTaskPostedHandler(OnTaskPostedHandler handler) {
1383   DCHECK(should_notify_observers_ && !handler.is_null());
1384   std::unique_ptr<OnTaskPostedCallbackHandleImpl> handle =
1385       std::make_unique<OnTaskPostedCallbackHandleImpl>(this,
1386                                                        associated_thread_);
1387   base::internal::CheckedAutoLock lock(any_thread_lock_);
1388   any_thread_.on_task_posted_handlers.insert(
1389       {handle.get(), std::move(handler)});
1390   return handle;
1391 }
1392 
RemoveOnTaskPostedHandler(TaskQueueImpl::OnTaskPostedCallbackHandleImpl * on_task_posted_callback_handle)1393 void TaskQueueImpl::RemoveOnTaskPostedHandler(
1394     TaskQueueImpl::OnTaskPostedCallbackHandleImpl*
1395         on_task_posted_callback_handle) {
1396   base::internal::CheckedAutoLock lock(any_thread_lock_);
1397   any_thread_.on_task_posted_handlers.erase(on_task_posted_callback_handle);
1398 }
1399 
SetTaskExecutionTraceLogger(TaskExecutionTraceLogger logger)1400 void TaskQueueImpl::SetTaskExecutionTraceLogger(
1401     TaskExecutionTraceLogger logger) {
1402   DCHECK(should_notify_observers_ || logger.is_null());
1403   main_thread_only().task_execution_trace_logger = std::move(logger);
1404 }
1405 
IsUnregistered() const1406 bool TaskQueueImpl::IsUnregistered() const {
1407   base::internal::CheckedAutoLock lock(any_thread_lock_);
1408   return any_thread_.unregistered;
1409 }
1410 
GetSequenceManagerWeakPtr()1411 WeakPtr<SequenceManagerImpl> TaskQueueImpl::GetSequenceManagerWeakPtr() {
1412   return sequence_manager_->GetWeakPtr();
1413 }
1414 
ActivateDelayedFenceIfNeeded(const Task & task)1415 void TaskQueueImpl::ActivateDelayedFenceIfNeeded(const Task& task) {
1416   if (!main_thread_only().delayed_fence)
1417     return;
1418   if (main_thread_only().delayed_fence.value() > task.delayed_run_time)
1419     return;
1420   InsertFence(Fence(task.task_order()));
1421   main_thread_only().delayed_fence = std::nullopt;
1422 }
1423 
MaybeReportIpcTaskQueuedFromMainThread(const Task & pending_task)1424 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromMainThread(
1425     const Task& pending_task) {
1426   if (!pending_task.ipc_hash)
1427     return;
1428 
1429   // It's possible that tracing was just enabled and no disabled time has been
1430   // stored. In that case, skip emitting the event.
1431   if (!main_thread_only().disabled_time)
1432     return;
1433 
1434   bool tracing_enabled = false;
1435   TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1436                                      &tracing_enabled);
1437   if (!tracing_enabled)
1438     return;
1439 
1440   if (main_thread_only().is_enabled ||
1441       !main_thread_only().should_report_posted_tasks_when_disabled) {
1442     return;
1443   }
1444 
1445   base::TimeDelta time_since_disabled =
1446       sequence_manager_->main_thread_clock()->NowTicks() -
1447       main_thread_only().disabled_time.value();
1448 
1449   ReportIpcTaskQueued(pending_task, time_since_disabled);
1450 }
1451 
ShouldReportIpcTaskQueuedFromAnyThreadLocked(base::TimeDelta * time_since_disabled)1452 bool TaskQueueImpl::ShouldReportIpcTaskQueuedFromAnyThreadLocked(
1453     base::TimeDelta* time_since_disabled) {
1454   // It's possible that tracing was just enabled and no disabled time has been
1455   // stored. In that case, skip emitting the event.
1456   if (!any_thread_.tracing_only.disabled_time)
1457     return false;
1458 
1459   if (any_thread_.is_enabled ||
1460       any_thread_.tracing_only.should_report_posted_tasks_when_disabled) {
1461     return false;
1462   }
1463 
1464   *time_since_disabled = sequence_manager_->any_thread_clock()->NowTicks() -
1465                          any_thread_.tracing_only.disabled_time.value();
1466   return true;
1467 }
1468 
MaybeReportIpcTaskQueuedFromAnyThreadLocked(const Task & pending_task)1469 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadLocked(
1470     const Task& pending_task) {
1471   if (!pending_task.ipc_hash)
1472     return;
1473 
1474   bool tracing_enabled = false;
1475   TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1476                                      &tracing_enabled);
1477   if (!tracing_enabled)
1478     return;
1479 
1480   base::TimeDelta time_since_disabled;
1481   if (ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled))
1482     ReportIpcTaskQueued(pending_task, time_since_disabled);
1483 }
1484 
MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(const Task & pending_task)1485 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(
1486     const Task& pending_task) {
1487   if (!pending_task.ipc_hash)
1488     return;
1489 
1490   bool tracing_enabled = false;
1491   TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1492                                      &tracing_enabled);
1493   if (!tracing_enabled)
1494     return;
1495 
1496   base::TimeDelta time_since_disabled;
1497   bool should_report = false;
1498   {
1499     base::internal::CheckedAutoLock lock(any_thread_lock_);
1500     should_report =
1501         ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled);
1502   }
1503 
1504   if (should_report)
1505     ReportIpcTaskQueued(pending_task, time_since_disabled);
1506 }
1507 
ReportIpcTaskQueued(const Task & pending_task,const base::TimeDelta & time_since_disabled)1508 void TaskQueueImpl::ReportIpcTaskQueued(
1509     const Task& pending_task,
1510     const base::TimeDelta& time_since_disabled) {
1511   TRACE_EVENT_INSTANT(
1512       TRACE_DISABLED_BY_DEFAULT("lifecycles"), "task_posted_to_disabled_queue",
1513       [&](perfetto::EventContext ctx) {
1514         auto* proto = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
1515                           ->set_chrome_task_posted_to_disabled_queue();
1516         proto->set_time_since_disabled_ms(
1517             checked_cast<uint64_t>(time_since_disabled.InMilliseconds()));
1518         proto->set_ipc_hash(pending_task.ipc_hash);
1519         proto->set_source_location_iid(
1520             base::trace_event::InternedSourceLocation::Get(
1521                 &ctx, pending_task.posted_from));
1522       });
1523 }
1524 
OnQueueUnblocked()1525 void TaskQueueImpl::OnQueueUnblocked() {
1526   DCHECK(IsQueueEnabled());
1527   DCHECK(!BlockedByFence());
1528 
1529   main_thread_only().enqueue_order_at_which_we_became_unblocked =
1530       sequence_manager_->GetNextSequenceNumber();
1531   if (GetQueuePriority() <= DefaultPriority()) {
1532     // We are default priority or more important so update
1533     // |enqueue_order_at_which_we_became_unblocked_with_normal_priority|.
1534     main_thread_only()
1535         .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
1536         main_thread_only().enqueue_order_at_which_we_became_unblocked;
1537   }
1538 }
1539 
1540 std::unique_ptr<TaskQueue::QueueEnabledVoter>
CreateQueueEnabledVoter()1541 TaskQueueImpl::CreateQueueEnabledVoter() {
1542   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1543   return WrapUnique(
1544       new TaskQueue::QueueEnabledVoter(voter_weak_ptr_factory_.GetWeakPtr()));
1545 }
1546 
AddQueueEnabledVoter(bool voter_is_enabled,TaskQueue::QueueEnabledVoter & voter)1547 void TaskQueueImpl::AddQueueEnabledVoter(bool voter_is_enabled,
1548                                          TaskQueue::QueueEnabledVoter& voter) {
1549   ++main_thread_only().voter_count;
1550   if (voter_is_enabled) {
1551     ++main_thread_only().enabled_voter_count;
1552   }
1553 }
1554 
RemoveQueueEnabledVoter(bool voter_is_enabled,TaskQueue::QueueEnabledVoter & voter)1555 void TaskQueueImpl::RemoveQueueEnabledVoter(
1556     bool voter_is_enabled,
1557     TaskQueue::QueueEnabledVoter& voter) {
1558   bool was_enabled = AreAllQueueEnabledVotersEnabled();
1559   if (voter_is_enabled) {
1560     --main_thread_only().enabled_voter_count;
1561     DCHECK_GE(main_thread_only().enabled_voter_count, 0);
1562   }
1563 
1564   --main_thread_only().voter_count;
1565   DCHECK_GE(main_thread_only().voter_count, 0);
1566 
1567   bool is_enabled = AreAllQueueEnabledVotersEnabled();
1568   if (was_enabled != is_enabled) {
1569     SetQueueEnabled(is_enabled);
1570   }
1571 }
1572 
OnQueueEnabledVoteChanged(bool enabled)1573 void TaskQueueImpl::OnQueueEnabledVoteChanged(bool enabled) {
1574   bool was_enabled = AreAllQueueEnabledVotersEnabled();
1575   if (enabled) {
1576     ++main_thread_only().enabled_voter_count;
1577     DCHECK_LE(main_thread_only().enabled_voter_count,
1578               main_thread_only().voter_count);
1579   } else {
1580     --main_thread_only().enabled_voter_count;
1581     DCHECK_GE(main_thread_only().enabled_voter_count, 0);
1582   }
1583 
1584   bool is_enabled = AreAllQueueEnabledVotersEnabled();
1585   if (was_enabled != is_enabled) {
1586     SetQueueEnabled(is_enabled);
1587   }
1588 }
1589 
CompleteInitializationOnBoundThread()1590 void TaskQueueImpl::CompleteInitializationOnBoundThread() {
1591   voter_weak_ptr_factory_.BindToCurrentSequence(
1592       subtle::BindWeakPtrFactoryPassKey());
1593 }
1594 
DefaultPriority() const1595 TaskQueue::QueuePriority TaskQueueImpl::DefaultPriority() const {
1596   return sequence_manager()->settings().priority_settings.default_priority();
1597 }
1598 
IsQueueEnabledFromAnyThread() const1599 bool TaskQueueImpl::IsQueueEnabledFromAnyThread() const {
1600   base::internal::CheckedAutoLock lock(any_thread_lock_);
1601   return any_thread_.is_enabled;
1602 }
1603 
1604 TaskQueueImpl::DelayedIncomingQueue::DelayedIncomingQueue() = default;
1605 TaskQueueImpl::DelayedIncomingQueue::~DelayedIncomingQueue() = default;
1606 
push(Task task)1607 void TaskQueueImpl::DelayedIncomingQueue::push(Task task) {
1608   // TODO(crbug.com/1247285): Remove this once the cause of corrupted tasks in
1609   // the queue is understood.
1610   CHECK(task.task);
1611   if (task.is_high_res)
1612     pending_high_res_tasks_++;
1613   queue_.insert(std::move(task));
1614 }
1615 
remove(HeapHandle heap_handle)1616 void TaskQueueImpl::DelayedIncomingQueue::remove(HeapHandle heap_handle) {
1617   DCHECK(!empty());
1618   DCHECK_LT(heap_handle.index(), queue_.size());
1619   Task task = queue_.take(heap_handle);
1620   if (task.is_high_res) {
1621     pending_high_res_tasks_--;
1622     DCHECK_GE(pending_high_res_tasks_, 0);
1623   }
1624 }
1625 
take_top()1626 Task TaskQueueImpl::DelayedIncomingQueue::take_top() {
1627   DCHECK(!empty());
1628   if (queue_.top().is_high_res) {
1629     pending_high_res_tasks_--;
1630     DCHECK_GE(pending_high_res_tasks_, 0);
1631   }
1632   return queue_.take_top();
1633 }
1634 
swap(DelayedIncomingQueue * rhs)1635 void TaskQueueImpl::DelayedIncomingQueue::swap(DelayedIncomingQueue* rhs) {
1636   std::swap(pending_high_res_tasks_, rhs->pending_high_res_tasks_);
1637   std::swap(queue_, rhs->queue_);
1638 }
1639 
SweepCancelledTasks(SequenceManagerImpl * sequence_manager)1640 void TaskQueueImpl::DelayedIncomingQueue::SweepCancelledTasks(
1641     SequenceManagerImpl* sequence_manager) {
1642   // Note: IntrusiveHeap::EraseIf() is safe against re-entrancy caused by
1643   // deleted tasks posting new tasks.
1644   queue_.EraseIf([this](const Task& task) {
1645     if (task.task.IsCancelled()) {
1646       if (task.is_high_res) {
1647         --pending_high_res_tasks_;
1648         DCHECK_GE(pending_high_res_tasks_, 0);
1649       }
1650       return true;
1651     }
1652     return false;
1653   });
1654 }
1655 
AsValue(TimeTicks now) const1656 Value::List TaskQueueImpl::DelayedIncomingQueue::AsValue(TimeTicks now) const {
1657   Value::List state;
1658   for (const Task& task : queue_)
1659     state.Append(TaskAsValue(task, now));
1660   return state;
1661 }
1662 
operator ()(const Task & lhs,const Task & rhs) const1663 bool TaskQueueImpl::DelayedIncomingQueue::Compare::operator()(
1664     const Task& lhs,
1665     const Task& rhs) const {
1666   // Delayed tasks are ordered by latest_delayed_run_time(). The top task may
1667   // not be the first task eligible to run, but tasks will always become ripe
1668   // before their latest_delayed_run_time().
1669   const TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time();
1670   const TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time();
1671   if (lhs_latest_delayed_run_time == rhs_latest_delayed_run_time)
1672     return lhs.sequence_num > rhs.sequence_num;
1673   return lhs_latest_delayed_run_time > rhs_latest_delayed_run_time;
1674 }
1675 
OnTaskPostedCallbackHandleImpl(TaskQueueImpl * task_queue_impl,scoped_refptr<const AssociatedThreadId> associated_thread)1676 TaskQueueImpl::OnTaskPostedCallbackHandleImpl::OnTaskPostedCallbackHandleImpl(
1677     TaskQueueImpl* task_queue_impl,
1678     scoped_refptr<const AssociatedThreadId> associated_thread)
1679     : task_queue_impl_(task_queue_impl),
1680       associated_thread_(std::move(associated_thread)) {
1681   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1682 }
1683 
1684 TaskQueueImpl::OnTaskPostedCallbackHandleImpl::
~OnTaskPostedCallbackHandleImpl()1685     ~OnTaskPostedCallbackHandleImpl() {
1686   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1687   if (task_queue_impl_)
1688     task_queue_impl_->RemoveOnTaskPostedHandler(this);
1689 }
1690 
1691 }  // namespace internal
1692 }  // namespace sequence_manager
1693 }  // namespace base
1694