1 // Copyright 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef QUICHE_COMMON_BTREE_SCHEDULER_H_
6 #define QUICHE_COMMON_BTREE_SCHEDULER_H_
7
8 #include <cstddef>
9 #include <limits>
10 #include <optional>
11 #include <utility>
12
13 #include "absl/base/attributes.h"
14 #include "absl/container/btree_map.h"
15 #include "absl/container/node_hash_map.h"
16 #include "absl/status/status.h"
17 #include "absl/status/statusor.h"
18 #include "quiche/common/platform/api/quiche_bug_tracker.h"
19 #include "quiche/common/platform/api/quiche_export.h"
20 #include "quiche/common/platform/api/quiche_logging.h"
21
22 namespace quiche {
23
24 // BTreeScheduler is a data structure that allows streams (and potentially other
25 // entities) to be scheduled according to the arbitrary priorities. The API for
26 // using the scheduler can be used as follows:
27 // - A stream has to be registered with a priority before being scheduled.
28 // - A stream can be unregistered, or can be re-prioritized.
29 // - A stream can be scheduled; that adds it into the queue.
30 // - PopFront() will return the stream with highest priority.
31 // - ShouldYield() will return if there is a stream with higher priority than
32 // the specified one.
33 //
34 // The prioritization works as following:
35 // - If two streams have different priorities, the higher priority stream goes
36 // first.
37 // - If two streams have the same priority, the one that got scheduled earlier
38 // goes first. Internally, this is implemented by assigning a monotonically
39 // decreasing sequence number to every newly scheduled stream.
40 //
41 // The Id type has to define operator==, be hashable via absl::Hash, and
42 // printable via operator<<; the Priority type has to define operator<.
43 template <typename Id, typename Priority>
44 class QUICHE_NO_EXPORT BTreeScheduler {
45 public:
46 // Returns true if there are any streams registered.
HasRegistered()47 bool HasRegistered() const { return !streams_.empty(); }
48 // Returns true if there are any streams scheduled.
HasScheduled()49 bool HasScheduled() const { return !schedule_.empty(); }
50 // Returns the number of currently scheduled streams.
NumScheduled()51 size_t NumScheduled() const { return schedule_.size(); }
52
53 // Counts the number of scheduled entries in the range [min, max]. If either
54 // min or max is omitted, negative or positive infinity is assumed.
55 size_t NumScheduledInPriorityRange(std::optional<Priority> min,
56 std::optional<Priority> max) const;
57
58 // Returns true if there is a stream that would go before `id` in the
59 // schedule.
60 absl::StatusOr<bool> ShouldYield(Id id) const;
61
62 // Returns the priority for `id`, or nullopt if stream is not registered.
GetPriorityFor(Id id)63 std::optional<Priority> GetPriorityFor(Id id) const {
64 auto it = streams_.find(id);
65 if (it == streams_.end()) {
66 return std::nullopt;
67 }
68 return it->second.priority;
69 }
70
71 // Pops the highest priority stream. Will fail if the schedule is empty.
72 absl::StatusOr<Id> PopFront();
73
74 // Registers the specified stream with the supplied priority. The stream must
75 // not be already registered.
76 absl::Status Register(Id stream_id, const Priority& priority);
77 // Unregisters a previously registered stream.
78 absl::Status Unregister(Id stream_id);
79 // Alters the priority of an already registered stream.
80 absl::Status UpdatePriority(Id stream_id, const Priority& new_priority);
81
82 // Adds the `stream` into the schedule if it's not already there.
83 absl::Status Schedule(Id stream_id);
84 // Returns true if `stream` is in the schedule.
85 bool IsScheduled(Id stream_id) const;
86
87 private:
88 // A record for a registered stream.
89 struct StreamEntry {
90 // The current priority of the stream.
91 ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority;
92 // If present, the sequence number with which the stream is currently
93 // scheduled. If absent, indicates that the stream is not scheduled.
94 std::optional<int> current_sequence_number;
95
scheduledStreamEntry96 bool scheduled() const { return current_sequence_number.has_value(); }
97 };
98 // The full entry for the stream (includes the ID that's used as a hashmap
99 // key).
100 using FullStreamEntry = std::pair<const Id, StreamEntry>;
101
102 // A key that is used to order entities within the schedule.
103 struct ScheduleKey {
104 // The main order key: the priority of the stream.
105 ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority;
106 // The secondary order key: the sequence number.
107 int sequence_number;
108
109 // Orders schedule keys in order of decreasing priority.
110 bool operator<(const ScheduleKey& other) const {
111 return std::make_tuple(priority, sequence_number) >
112 std::make_tuple(other.priority, other.sequence_number);
113 }
114
115 // In order to find all entities with priority `p`, one can iterate between
116 // `lower_bound(MinForPriority(p))` and `upper_bound(MaxForPriority(p))`.
MinForPriorityScheduleKey117 static ScheduleKey MinForPriority(Priority priority) {
118 return ScheduleKey{priority, std::numeric_limits<int>::max()};
119 }
MaxForPriorityScheduleKey120 static ScheduleKey MaxForPriority(Priority priority) {
121 return ScheduleKey{priority, std::numeric_limits<int>::min()};
122 }
123 };
124 using FullScheduleEntry = std::pair<const ScheduleKey, FullStreamEntry*>;
125 using ScheduleIterator =
126 typename absl::btree_map<ScheduleKey, FullStreamEntry*>::const_iterator;
127
128 // Convenience method to get the stream ID for a schedule entry.
StreamId(const FullScheduleEntry & entry)129 static Id StreamId(const FullScheduleEntry& entry) {
130 return entry.second->first;
131 }
132
133 // Removes a stream from the schedule, and returns the old entry if it were
134 // present.
135 absl::StatusOr<FullScheduleEntry> DescheduleStream(const StreamEntry& entry);
136
137 // The map of currently registered streams.
138 absl::node_hash_map<Id, StreamEntry> streams_;
139 // The stream schedule, ordered starting from the highest priority stream.
140 absl::btree_map<ScheduleKey, FullStreamEntry*> schedule_;
141
142 // The counter that is used to ensure that streams with the same priority are
143 // handled in the FIFO order. Decreases with every write.
144 int current_write_sequence_number_ = 0;
145 };
146
147 template <typename Id, typename Priority>
NumScheduledInPriorityRange(std::optional<Priority> min,std::optional<Priority> max)148 size_t BTreeScheduler<Id, Priority>::NumScheduledInPriorityRange(
149 std::optional<Priority> min, std::optional<Priority> max) const {
150 if (min.has_value() && max.has_value()) {
151 QUICHE_DCHECK(*min <= *max);
152 }
153 // This is reversed, since the schedule is ordered in the descending priority
154 // order.
155 ScheduleIterator begin =
156 max.has_value() ? schedule_.lower_bound(ScheduleKey::MinForPriority(*max))
157 : schedule_.begin();
158 ScheduleIterator end =
159 min.has_value() ? schedule_.upper_bound(ScheduleKey::MaxForPriority(*min))
160 : schedule_.end();
161 return end - begin;
162 }
163
164 template <typename Id, typename Priority>
Register(Id stream_id,const Priority & priority)165 absl::Status BTreeScheduler<Id, Priority>::Register(Id stream_id,
166 const Priority& priority) {
167 auto [it, success] = streams_.insert({stream_id, StreamEntry{priority}});
168 if (!success) {
169 return absl::AlreadyExistsError("ID already registered");
170 }
171 return absl::OkStatus();
172 }
173
174 template <typename Id, typename Priority>
175 auto BTreeScheduler<Id, Priority>::DescheduleStream(const StreamEntry& entry)
176 -> absl::StatusOr<FullScheduleEntry> {
177 QUICHE_DCHECK(entry.scheduled());
178 auto it = schedule_.find(
179 ScheduleKey{entry.priority, *entry.current_sequence_number});
180 if (it == schedule_.end()) {
181 return absl::InternalError(
182 "Calling DescheduleStream() on an entry that is not in the schedule at "
183 "the expected key.");
184 }
185 FullScheduleEntry result = *it;
186 schedule_.erase(it);
187 return result;
188 }
189
190 template <typename Id, typename Priority>
Unregister(Id stream_id)191 absl::Status BTreeScheduler<Id, Priority>::Unregister(Id stream_id) {
192 auto it = streams_.find(stream_id);
193 if (it == streams_.end()) {
194 return absl::NotFoundError("Stream not registered");
195 }
196 const StreamEntry& stream = it->second;
197
198 if (stream.scheduled()) {
199 if (!DescheduleStream(stream).ok()) {
200 QUICHE_BUG(BTreeSchedule_Unregister_NotInSchedule)
201 << "UnregisterStream() called on a stream ID " << stream_id
202 << ", which is marked ready, but is not in the schedule";
203 }
204 }
205
206 streams_.erase(it);
207 return absl::OkStatus();
208 }
209
210 template <typename Id, typename Priority>
UpdatePriority(Id stream_id,const Priority & new_priority)211 absl::Status BTreeScheduler<Id, Priority>::UpdatePriority(
212 Id stream_id, const Priority& new_priority) {
213 auto it = streams_.find(stream_id);
214 if (it == streams_.end()) {
215 return absl::NotFoundError("ID not registered");
216 }
217
218 StreamEntry& stream = it->second;
219 std::optional<int> sequence_number;
220 if (stream.scheduled()) {
221 absl::StatusOr<FullScheduleEntry> old_entry = DescheduleStream(stream);
222 if (old_entry.ok()) {
223 sequence_number = old_entry->first.sequence_number;
224 QUICHE_DCHECK_EQ(old_entry->second, &*it);
225 } else {
226 QUICHE_BUG(BTreeScheduler_Update_Not_In_Schedule)
227 << "UpdatePriority() called on a stream ID " << stream_id
228 << ", which is marked ready, but is not in the schedule";
229 }
230 }
231
232 stream.priority = new_priority;
233 if (sequence_number.has_value()) {
234 schedule_.insert({ScheduleKey{stream.priority, *sequence_number}, &*it});
235 }
236 return absl::OkStatus();
237 }
238
239 template <typename Id, typename Priority>
ShouldYield(Id stream_id)240 absl::StatusOr<bool> BTreeScheduler<Id, Priority>::ShouldYield(
241 Id stream_id) const {
242 const auto stream_it = streams_.find(stream_id);
243 if (stream_it == streams_.end()) {
244 return absl::NotFoundError("ID not registered");
245 }
246 const StreamEntry& stream = stream_it->second;
247
248 if (schedule_.empty()) {
249 return false;
250 }
251 const FullScheduleEntry& next = *schedule_.begin();
252 if (StreamId(next) == stream_id) {
253 return false;
254 }
255 return next.first.priority >= stream.priority;
256 }
257
258 template <typename Id, typename Priority>
PopFront()259 absl::StatusOr<Id> BTreeScheduler<Id, Priority>::PopFront() {
260 if (schedule_.empty()) {
261 return absl::NotFoundError("No streams scheduled");
262 }
263 auto schedule_it = schedule_.begin();
264 QUICHE_DCHECK(schedule_it->second->second.scheduled());
265 schedule_it->second->second.current_sequence_number = std::nullopt;
266
267 Id result = StreamId(*schedule_it);
268 schedule_.erase(schedule_it);
269 return result;
270 }
271
272 template <typename Id, typename Priority>
Schedule(Id stream_id)273 absl::Status BTreeScheduler<Id, Priority>::Schedule(Id stream_id) {
274 const auto stream_it = streams_.find(stream_id);
275 if (stream_it == streams_.end()) {
276 return absl::NotFoundError("ID not registered");
277 }
278 if (stream_it->second.scheduled()) {
279 return absl::OkStatus();
280 }
281 auto [schedule_it, success] =
282 schedule_.insert({ScheduleKey{stream_it->second.priority,
283 --current_write_sequence_number_},
284 &*stream_it});
285 QUICHE_BUG_IF(WebTransportWriteBlockedList_AddStream_conflict, !success)
286 << "Conflicting key in scheduler for stream " << stream_id;
287 stream_it->second.current_sequence_number =
288 schedule_it->first.sequence_number;
289 return absl::OkStatus();
290 }
291
292 template <typename Id, typename Priority>
IsScheduled(Id stream_id)293 bool BTreeScheduler<Id, Priority>::IsScheduled(Id stream_id) const {
294 const auto stream_it = streams_.find(stream_id);
295 if (stream_it == streams_.end()) {
296 return false;
297 }
298 return stream_it->second.scheduled();
299 }
300
301 } // namespace quiche
302
303 #endif // QUICHE_COMMON_BTREE_SCHEDULER_H_
304