1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/base/prioritized_dispatcher.h"
6
7 #include <ostream>
8
9 #include "base/check_op.h"
10
11 namespace net {
12
Limits(Priority num_priorities,size_t total_jobs)13 PrioritizedDispatcher::Limits::Limits(Priority num_priorities,
14 size_t total_jobs)
15 : total_jobs(total_jobs), reserved_slots(num_priorities) {}
16
17 PrioritizedDispatcher::Limits::Limits(const Limits& other) = default;
18
19 PrioritizedDispatcher::Limits::~Limits() = default;
20
PrioritizedDispatcher(const Limits & limits)21 PrioritizedDispatcher::PrioritizedDispatcher(const Limits& limits)
22 : queue_(limits.reserved_slots.size()),
23 max_running_jobs_(limits.reserved_slots.size()) {
24 SetLimits(limits);
25 }
26
27 PrioritizedDispatcher::~PrioritizedDispatcher() = default;
28
Add(Job * job,Priority priority)29 PrioritizedDispatcher::Handle PrioritizedDispatcher::Add(
30 Job* job, Priority priority) {
31 DCHECK(job);
32 DCHECK_LT(priority, num_priorities());
33 if (num_running_jobs_ < max_running_jobs_[priority]) {
34 ++num_running_jobs_;
35 job->Start();
36 return Handle();
37 }
38 return queue_.Insert(job, priority);
39 }
40
AddAtHead(Job * job,Priority priority)41 PrioritizedDispatcher::Handle PrioritizedDispatcher::AddAtHead(
42 Job* job, Priority priority) {
43 DCHECK(job);
44 DCHECK_LT(priority, num_priorities());
45 if (num_running_jobs_ < max_running_jobs_[priority]) {
46 ++num_running_jobs_;
47 job->Start();
48 return Handle();
49 }
50 return queue_.InsertAtFront(job, priority);
51 }
52
Cancel(const Handle & handle)53 void PrioritizedDispatcher::Cancel(const Handle& handle) {
54 queue_.Erase(handle);
55 }
56
EvictOldestLowest()57 PrioritizedDispatcher::Job* PrioritizedDispatcher::EvictOldestLowest() {
58 Handle handle = queue_.FirstMin();
59 if (handle.is_null())
60 return nullptr;
61 Job* job = handle.value();
62 Cancel(handle);
63 return job;
64 }
65
ChangePriority(const Handle & handle,Priority priority)66 PrioritizedDispatcher::Handle PrioritizedDispatcher::ChangePriority(
67 const Handle& handle, Priority priority) {
68 DCHECK(!handle.is_null());
69 DCHECK_LT(priority, num_priorities());
70 DCHECK_GE(num_running_jobs_, max_running_jobs_[handle.priority()]) <<
71 "Job should not be in queue when limits permit it to start.";
72
73 if (handle.priority() == priority)
74 return handle;
75
76 if (MaybeDispatchJob(handle, priority))
77 return Handle();
78 Job* job = handle.value();
79 queue_.Erase(handle);
80 return queue_.Insert(job, priority);
81 }
82
OnJobFinished()83 void PrioritizedDispatcher::OnJobFinished() {
84 DCHECK_GT(num_running_jobs_, 0u);
85 --num_running_jobs_;
86 MaybeDispatchNextJob();
87 }
88
GetLimits() const89 PrioritizedDispatcher::Limits PrioritizedDispatcher::GetLimits() const {
90 size_t num_priorities = max_running_jobs_.size();
91 Limits limits(num_priorities, max_running_jobs_.back());
92
93 // Calculate the number of jobs reserved for each priority and higher. Leave
94 // the number of jobs reserved for the lowest priority or higher as 0.
95 for (size_t i = 1; i < num_priorities; ++i) {
96 limits.reserved_slots[i] = max_running_jobs_[i] - max_running_jobs_[i - 1];
97 }
98
99 return limits;
100 }
101
SetLimits(const Limits & limits)102 void PrioritizedDispatcher::SetLimits(const Limits& limits) {
103 DCHECK_EQ(queue_.num_priorities(), limits.reserved_slots.size());
104 size_t total = 0;
105 for (size_t i = 0; i < limits.reserved_slots.size(); ++i) {
106 total += limits.reserved_slots[i];
107 max_running_jobs_[i] = total;
108 }
109 // Unreserved slots are available for all priorities.
110 DCHECK_LE(total, limits.total_jobs) << "sum(reserved_slots) <= total_jobs";
111 size_t spare = limits.total_jobs - total;
112 for (size_t i = limits.reserved_slots.size(); i > 0; --i) {
113 max_running_jobs_[i - 1] += spare;
114 }
115
116 // Start pending jobs, if limits permit.
117 while (true) {
118 if (!MaybeDispatchNextJob())
119 break;
120 }
121 }
122
SetLimitsToZero()123 void PrioritizedDispatcher::SetLimitsToZero() {
124 SetLimits(Limits(queue_.num_priorities(), 0));
125 }
126
MaybeDispatchJob(const Handle & handle,Priority job_priority)127 bool PrioritizedDispatcher::MaybeDispatchJob(const Handle& handle,
128 Priority job_priority) {
129 DCHECK_LT(job_priority, num_priorities());
130 if (num_running_jobs_ >= max_running_jobs_[job_priority])
131 return false;
132 Job* job = handle.value();
133 queue_.Erase(handle);
134 ++num_running_jobs_;
135 job->Start();
136 return true;
137 }
138
MaybeDispatchNextJob()139 bool PrioritizedDispatcher::MaybeDispatchNextJob() {
140 Handle handle = queue_.FirstMax();
141 if (handle.is_null()) {
142 DCHECK_EQ(0u, queue_.size());
143 return false;
144 }
145 return MaybeDispatchJob(handle, handle.priority());
146 }
147
148 } // namespace net
149