xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/common/btree_scheduler.h (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef QUICHE_COMMON_BTREE_SCHEDULER_H_
6 #define QUICHE_COMMON_BTREE_SCHEDULER_H_
7 
8 #include <cstddef>
9 #include <limits>
10 #include <optional>
11 #include <utility>
12 
13 #include "absl/base/attributes.h"
14 #include "absl/container/btree_map.h"
15 #include "absl/container/node_hash_map.h"
16 #include "absl/status/status.h"
17 #include "absl/status/statusor.h"
18 #include "quiche/common/platform/api/quiche_bug_tracker.h"
19 #include "quiche/common/platform/api/quiche_export.h"
20 #include "quiche/common/platform/api/quiche_logging.h"
21 
22 namespace quiche {
23 
24 // BTreeScheduler is a data structure that allows streams (and potentially other
25 // entities) to be scheduled according to the arbitrary priorities.  The API for
26 // using the scheduler can be used as follows:
27 //  - A stream has to be registered with a priority before being scheduled.
28 //  - A stream can be unregistered, or can be re-prioritized.
29 //  - A stream can be scheduled; that adds it into the queue.
30 //  - PopFront() will return the stream with highest priority.
31 //  - ShouldYield() will return if there is a stream with higher priority than
32 //    the specified one.
33 //
34 // The prioritization works as following:
35 //  - If two streams have different priorities, the higher priority stream goes
36 //    first.
37 //  - If two streams have the same priority, the one that got scheduled earlier
38 //    goes first. Internally, this is implemented by assigning a monotonically
39 //    decreasing sequence number to every newly scheduled stream.
40 //
41 // The Id type has to define operator==, be hashable via absl::Hash, and
42 // printable via operator<<; the Priority type has to define operator<.
43 template <typename Id, typename Priority>
44 class QUICHE_NO_EXPORT BTreeScheduler {
45  public:
46   // Returns true if there are any streams registered.
HasRegistered()47   bool HasRegistered() const { return !streams_.empty(); }
48   // Returns true if there are any streams scheduled.
HasScheduled()49   bool HasScheduled() const { return !schedule_.empty(); }
50   // Returns the number of currently scheduled streams.
NumScheduled()51   size_t NumScheduled() const { return schedule_.size(); }
52 
53   // Counts the number of scheduled entries in the range [min, max].  If either
54   // min or max is omitted, negative or positive infinity is assumed.
55   size_t NumScheduledInPriorityRange(std::optional<Priority> min,
56                                      std::optional<Priority> max) const;
57 
58   // Returns true if there is a stream that would go before `id` in the
59   // schedule.
60   absl::StatusOr<bool> ShouldYield(Id id) const;
61 
62   // Returns the priority for `id`, or nullopt if stream is not registered.
GetPriorityFor(Id id)63   std::optional<Priority> GetPriorityFor(Id id) const {
64     auto it = streams_.find(id);
65     if (it == streams_.end()) {
66       return std::nullopt;
67     }
68     return it->second.priority;
69   }
70 
71   // Pops the highest priority stream.  Will fail if the schedule is empty.
72   absl::StatusOr<Id> PopFront();
73 
74   // Registers the specified stream with the supplied priority.  The stream must
75   // not be already registered.
76   absl::Status Register(Id stream_id, const Priority& priority);
77   // Unregisters a previously registered stream.
78   absl::Status Unregister(Id stream_id);
79   // Alters the priority of an already registered stream.
80   absl::Status UpdatePriority(Id stream_id, const Priority& new_priority);
81 
82   // Adds the `stream` into the schedule if it's not already there.
83   absl::Status Schedule(Id stream_id);
84   // Returns true if `stream` is in the schedule.
85   bool IsScheduled(Id stream_id) const;
86 
87  private:
88   // A record for a registered stream.
89   struct StreamEntry {
90     // The current priority of the stream.
91     ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority;
92     // If present, the sequence number with which the stream is currently
93     // scheduled.  If absent, indicates that the stream is not scheduled.
94     std::optional<int> current_sequence_number;
95 
scheduledStreamEntry96     bool scheduled() const { return current_sequence_number.has_value(); }
97   };
98   // The full entry for the stream (includes the ID that's used as a hashmap
99   // key).
100   using FullStreamEntry = std::pair<const Id, StreamEntry>;
101 
102   // A key that is used to order entities within the schedule.
103   struct ScheduleKey {
104     // The main order key: the priority of the stream.
105     ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority;
106     // The secondary order key: the sequence number.
107     int sequence_number;
108 
109     // Orders schedule keys in order of decreasing priority.
110     bool operator<(const ScheduleKey& other) const {
111       return std::make_tuple(priority, sequence_number) >
112              std::make_tuple(other.priority, other.sequence_number);
113     }
114 
115     // In order to find all entities with priority `p`, one can iterate between
116     // `lower_bound(MinForPriority(p))` and `upper_bound(MaxForPriority(p))`.
MinForPriorityScheduleKey117     static ScheduleKey MinForPriority(Priority priority) {
118       return ScheduleKey{priority, std::numeric_limits<int>::max()};
119     }
MaxForPriorityScheduleKey120     static ScheduleKey MaxForPriority(Priority priority) {
121       return ScheduleKey{priority, std::numeric_limits<int>::min()};
122     }
123   };
124   using FullScheduleEntry = std::pair<const ScheduleKey, FullStreamEntry*>;
125   using ScheduleIterator =
126       typename absl::btree_map<ScheduleKey, FullStreamEntry*>::const_iterator;
127 
128   // Convenience method to get the stream ID for a schedule entry.
StreamId(const FullScheduleEntry & entry)129   static Id StreamId(const FullScheduleEntry& entry) {
130     return entry.second->first;
131   }
132 
133   // Removes a stream from the schedule, and returns the old entry if it were
134   // present.
135   absl::StatusOr<FullScheduleEntry> DescheduleStream(const StreamEntry& entry);
136 
137   // The map of currently registered streams.
138   absl::node_hash_map<Id, StreamEntry> streams_;
139   // The stream schedule, ordered starting from the highest priority stream.
140   absl::btree_map<ScheduleKey, FullStreamEntry*> schedule_;
141 
142   // The counter that is used to ensure that streams with the same priority are
143   // handled in the FIFO order.  Decreases with every write.
144   int current_write_sequence_number_ = 0;
145 };
146 
147 template <typename Id, typename Priority>
NumScheduledInPriorityRange(std::optional<Priority> min,std::optional<Priority> max)148 size_t BTreeScheduler<Id, Priority>::NumScheduledInPriorityRange(
149     std::optional<Priority> min, std::optional<Priority> max) const {
150   if (min.has_value() && max.has_value()) {
151     QUICHE_DCHECK(*min <= *max);
152   }
153   // This is reversed, since the schedule is ordered in the descending priority
154   // order.
155   ScheduleIterator begin =
156       max.has_value() ? schedule_.lower_bound(ScheduleKey::MinForPriority(*max))
157                       : schedule_.begin();
158   ScheduleIterator end =
159       min.has_value() ? schedule_.upper_bound(ScheduleKey::MaxForPriority(*min))
160                       : schedule_.end();
161   return end - begin;
162 }
163 
164 template <typename Id, typename Priority>
Register(Id stream_id,const Priority & priority)165 absl::Status BTreeScheduler<Id, Priority>::Register(Id stream_id,
166                                                     const Priority& priority) {
167   auto [it, success] = streams_.insert({stream_id, StreamEntry{priority}});
168   if (!success) {
169     return absl::AlreadyExistsError("ID already registered");
170   }
171   return absl::OkStatus();
172 }
173 
174 template <typename Id, typename Priority>
175 auto BTreeScheduler<Id, Priority>::DescheduleStream(const StreamEntry& entry)
176     -> absl::StatusOr<FullScheduleEntry> {
177   QUICHE_DCHECK(entry.scheduled());
178   auto it = schedule_.find(
179       ScheduleKey{entry.priority, *entry.current_sequence_number});
180   if (it == schedule_.end()) {
181     return absl::InternalError(
182         "Calling DescheduleStream() on an entry that is not in the schedule at "
183         "the expected key.");
184   }
185   FullScheduleEntry result = *it;
186   schedule_.erase(it);
187   return result;
188 }
189 
190 template <typename Id, typename Priority>
Unregister(Id stream_id)191 absl::Status BTreeScheduler<Id, Priority>::Unregister(Id stream_id) {
192   auto it = streams_.find(stream_id);
193   if (it == streams_.end()) {
194     return absl::NotFoundError("Stream not registered");
195   }
196   const StreamEntry& stream = it->second;
197 
198   if (stream.scheduled()) {
199     if (!DescheduleStream(stream).ok()) {
200       QUICHE_BUG(BTreeSchedule_Unregister_NotInSchedule)
201           << "UnregisterStream() called on a stream ID " << stream_id
202           << ", which is marked ready, but is not in the schedule";
203     }
204   }
205 
206   streams_.erase(it);
207   return absl::OkStatus();
208 }
209 
210 template <typename Id, typename Priority>
UpdatePriority(Id stream_id,const Priority & new_priority)211 absl::Status BTreeScheduler<Id, Priority>::UpdatePriority(
212     Id stream_id, const Priority& new_priority) {
213   auto it = streams_.find(stream_id);
214   if (it == streams_.end()) {
215     return absl::NotFoundError("ID not registered");
216   }
217 
218   StreamEntry& stream = it->second;
219   std::optional<int> sequence_number;
220   if (stream.scheduled()) {
221     absl::StatusOr<FullScheduleEntry> old_entry = DescheduleStream(stream);
222     if (old_entry.ok()) {
223       sequence_number = old_entry->first.sequence_number;
224       QUICHE_DCHECK_EQ(old_entry->second, &*it);
225     } else {
226       QUICHE_BUG(BTreeScheduler_Update_Not_In_Schedule)
227           << "UpdatePriority() called on a stream ID " << stream_id
228           << ", which is marked ready, but is not in the schedule";
229     }
230   }
231 
232   stream.priority = new_priority;
233   if (sequence_number.has_value()) {
234     schedule_.insert({ScheduleKey{stream.priority, *sequence_number}, &*it});
235   }
236   return absl::OkStatus();
237 }
238 
239 template <typename Id, typename Priority>
ShouldYield(Id stream_id)240 absl::StatusOr<bool> BTreeScheduler<Id, Priority>::ShouldYield(
241     Id stream_id) const {
242   const auto stream_it = streams_.find(stream_id);
243   if (stream_it == streams_.end()) {
244     return absl::NotFoundError("ID not registered");
245   }
246   const StreamEntry& stream = stream_it->second;
247 
248   if (schedule_.empty()) {
249     return false;
250   }
251   const FullScheduleEntry& next = *schedule_.begin();
252   if (StreamId(next) == stream_id) {
253     return false;
254   }
255   return next.first.priority >= stream.priority;
256 }
257 
258 template <typename Id, typename Priority>
PopFront()259 absl::StatusOr<Id> BTreeScheduler<Id, Priority>::PopFront() {
260   if (schedule_.empty()) {
261     return absl::NotFoundError("No streams scheduled");
262   }
263   auto schedule_it = schedule_.begin();
264   QUICHE_DCHECK(schedule_it->second->second.scheduled());
265   schedule_it->second->second.current_sequence_number = std::nullopt;
266 
267   Id result = StreamId(*schedule_it);
268   schedule_.erase(schedule_it);
269   return result;
270 }
271 
272 template <typename Id, typename Priority>
Schedule(Id stream_id)273 absl::Status BTreeScheduler<Id, Priority>::Schedule(Id stream_id) {
274   const auto stream_it = streams_.find(stream_id);
275   if (stream_it == streams_.end()) {
276     return absl::NotFoundError("ID not registered");
277   }
278   if (stream_it->second.scheduled()) {
279     return absl::OkStatus();
280   }
281   auto [schedule_it, success] =
282       schedule_.insert({ScheduleKey{stream_it->second.priority,
283                                     --current_write_sequence_number_},
284                         &*stream_it});
285   QUICHE_BUG_IF(WebTransportWriteBlockedList_AddStream_conflict, !success)
286       << "Conflicting key in scheduler for stream " << stream_id;
287   stream_it->second.current_sequence_number =
288       schedule_it->first.sequence_number;
289   return absl::OkStatus();
290 }
291 
292 template <typename Id, typename Priority>
IsScheduled(Id stream_id)293 bool BTreeScheduler<Id, Priority>::IsScheduled(Id stream_id) const {
294   const auto stream_it = streams_.find(stream_id);
295   if (stream_it == streams_.end()) {
296     return false;
297   }
298   return stream_it->second.scheduled();
299 }
300 
301 }  // namespace quiche
302 
303 #endif  // QUICHE_COMMON_BTREE_SCHEDULER_H_
304