xref: /aosp_15_r20/external/cronet/net/base/prioritized_dispatcher.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/base/prioritized_dispatcher.h"
6 
7 #include <ostream>
8 
9 #include "base/check_op.h"
10 
11 namespace net {
12 
Limits(Priority num_priorities,size_t total_jobs)13 PrioritizedDispatcher::Limits::Limits(Priority num_priorities,
14                                       size_t total_jobs)
15     : total_jobs(total_jobs), reserved_slots(num_priorities) {}
16 
17 PrioritizedDispatcher::Limits::Limits(const Limits& other) = default;
18 
19 PrioritizedDispatcher::Limits::~Limits() = default;
20 
PrioritizedDispatcher(const Limits & limits)21 PrioritizedDispatcher::PrioritizedDispatcher(const Limits& limits)
22     : queue_(limits.reserved_slots.size()),
23       max_running_jobs_(limits.reserved_slots.size()) {
24   SetLimits(limits);
25 }
26 
27 PrioritizedDispatcher::~PrioritizedDispatcher() = default;
28 
Add(Job * job,Priority priority)29 PrioritizedDispatcher::Handle PrioritizedDispatcher::Add(
30     Job* job, Priority priority) {
31   DCHECK(job);
32   DCHECK_LT(priority, num_priorities());
33   if (num_running_jobs_ < max_running_jobs_[priority]) {
34     ++num_running_jobs_;
35     job->Start();
36     return Handle();
37   }
38   return queue_.Insert(job, priority);
39 }
40 
AddAtHead(Job * job,Priority priority)41 PrioritizedDispatcher::Handle PrioritizedDispatcher::AddAtHead(
42     Job* job, Priority priority) {
43   DCHECK(job);
44   DCHECK_LT(priority, num_priorities());
45   if (num_running_jobs_ < max_running_jobs_[priority]) {
46     ++num_running_jobs_;
47     job->Start();
48     return Handle();
49   }
50   return queue_.InsertAtFront(job, priority);
51 }
52 
Cancel(const Handle & handle)53 void PrioritizedDispatcher::Cancel(const Handle& handle) {
54   queue_.Erase(handle);
55 }
56 
EvictOldestLowest()57 PrioritizedDispatcher::Job* PrioritizedDispatcher::EvictOldestLowest() {
58   Handle handle = queue_.FirstMin();
59   if (handle.is_null())
60     return nullptr;
61   Job* job = handle.value();
62   Cancel(handle);
63   return job;
64 }
65 
ChangePriority(const Handle & handle,Priority priority)66 PrioritizedDispatcher::Handle PrioritizedDispatcher::ChangePriority(
67     const Handle& handle, Priority priority) {
68   DCHECK(!handle.is_null());
69   DCHECK_LT(priority, num_priorities());
70   DCHECK_GE(num_running_jobs_, max_running_jobs_[handle.priority()]) <<
71       "Job should not be in queue when limits permit it to start.";
72 
73   if (handle.priority() == priority)
74     return handle;
75 
76   if (MaybeDispatchJob(handle, priority))
77     return Handle();
78   Job* job = handle.value();
79   queue_.Erase(handle);
80   return queue_.Insert(job, priority);
81 }
82 
OnJobFinished()83 void PrioritizedDispatcher::OnJobFinished() {
84   DCHECK_GT(num_running_jobs_, 0u);
85   --num_running_jobs_;
86   MaybeDispatchNextJob();
87 }
88 
GetLimits() const89 PrioritizedDispatcher::Limits PrioritizedDispatcher::GetLimits() const {
90   size_t num_priorities = max_running_jobs_.size();
91   Limits limits(num_priorities, max_running_jobs_.back());
92 
93   // Calculate the number of jobs reserved for each priority and higher.  Leave
94   // the number of jobs reserved for the lowest priority or higher as 0.
95   for (size_t i = 1; i < num_priorities; ++i) {
96     limits.reserved_slots[i] = max_running_jobs_[i] - max_running_jobs_[i - 1];
97   }
98 
99   return limits;
100 }
101 
SetLimits(const Limits & limits)102 void PrioritizedDispatcher::SetLimits(const Limits& limits) {
103   DCHECK_EQ(queue_.num_priorities(), limits.reserved_slots.size());
104   size_t total = 0;
105   for (size_t i = 0; i < limits.reserved_slots.size(); ++i) {
106     total += limits.reserved_slots[i];
107     max_running_jobs_[i] = total;
108   }
109   // Unreserved slots are available for all priorities.
110   DCHECK_LE(total, limits.total_jobs) << "sum(reserved_slots) <= total_jobs";
111   size_t spare = limits.total_jobs - total;
112   for (size_t i = limits.reserved_slots.size(); i > 0; --i) {
113     max_running_jobs_[i - 1] += spare;
114   }
115 
116   // Start pending jobs, if limits permit.
117   while (true) {
118     if (!MaybeDispatchNextJob())
119       break;
120   }
121 }
122 
SetLimitsToZero()123 void PrioritizedDispatcher::SetLimitsToZero() {
124   SetLimits(Limits(queue_.num_priorities(), 0));
125 }
126 
MaybeDispatchJob(const Handle & handle,Priority job_priority)127 bool PrioritizedDispatcher::MaybeDispatchJob(const Handle& handle,
128                                              Priority job_priority) {
129   DCHECK_LT(job_priority, num_priorities());
130   if (num_running_jobs_ >= max_running_jobs_[job_priority])
131     return false;
132   Job* job = handle.value();
133   queue_.Erase(handle);
134   ++num_running_jobs_;
135   job->Start();
136   return true;
137 }
138 
MaybeDispatchNextJob()139 bool PrioritizedDispatcher::MaybeDispatchNextJob() {
140   Handle handle = queue_.FirstMax();
141   if (handle.is_null()) {
142     DCHECK_EQ(0u, queue_.size());
143     return false;
144   }
145   return MaybeDispatchJob(handle, handle.priority());
146 }
147 
148 }  // namespace net
149