xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/http2/core/priority_write_scheduler.h (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright (c) 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef QUICHE_HTTP2_CORE_PRIORITY_WRITE_SCHEDULER_H_
6 #define QUICHE_HTTP2_CORE_PRIORITY_WRITE_SCHEDULER_H_
7 
8 #include <algorithm>
9 #include <cstddef>
10 #include <cstdint>
11 #include <memory>
12 #include <optional>
13 #include <string>
14 #include <tuple>
15 #include <utility>
16 #include <vector>
17 
18 #include "absl/container/flat_hash_map.h"
19 #include "absl/strings/str_cat.h"
20 #include "absl/time/time.h"
21 #include "quiche/common/platform/api/quiche_bug_tracker.h"
22 #include "quiche/common/platform/api/quiche_export.h"
23 #include "quiche/common/platform/api/quiche_logging.h"
24 #include "quiche/common/quiche_circular_deque.h"
25 #include "quiche/spdy/core/spdy_protocol.h"
26 
27 namespace http2 {
28 
29 namespace test {
30 template <typename StreamIdType>
31 class PriorityWriteSchedulerPeer;
32 }
33 
34 // SpdyPriority is an integer type, so this functor can be used both as
35 // PriorityTypeToInt and as IntToPriorityType.
36 struct QUICHE_EXPORT SpdyPriorityToSpdyPriority {
operatorSpdyPriorityToSpdyPriority37   spdy::SpdyPriority operator()(spdy::SpdyPriority priority) {
38     return priority;
39   }
40 };
41 
42 // PriorityWriteScheduler manages the order in which HTTP/2 or HTTP/3 streams
43 // are written. Each stream has a priority of type PriorityType. This includes
44 // an integer between 0 and 7, and optionally other information that is stored
45 // but otherwise ignored by this class.  Higher priority (lower integer value)
46 // streams are always given precedence over lower priority (higher value)
47 // streams, as long as the higher priority stream is not blocked.
48 //
49 // Each stream can be in one of two states: ready or not ready (for writing).
50 // Ready state is changed by calling the MarkStreamReady() and
51 // MarkStreamNotReady() methods. Only streams in the ready state can be returned
52 // by PopNextReadyStream(). When returned by that method, the stream's state
53 // changes to not ready.
54 //
55 template <typename StreamIdType, typename PriorityType = spdy::SpdyPriority,
56           typename PriorityTypeToInt = SpdyPriorityToSpdyPriority,
57           typename IntToPriorityType = SpdyPriorityToSpdyPriority>
58 class QUICHE_EXPORT PriorityWriteScheduler {
59  public:
60   static constexpr int kHighestPriority = 0;
61   static constexpr int kLowestPriority = 7;
62 
63   static_assert(spdy::kV3HighestPriority == kHighestPriority);
64   static_assert(spdy::kV3LowestPriority == kLowestPriority);
65 
66   // Registers new stream `stream_id` with the scheduler, assigning it the
67   // given priority.
68   //
69   // Preconditions: `stream_id` should be unregistered.
RegisterStream(StreamIdType stream_id,PriorityType priority)70   void RegisterStream(StreamIdType stream_id, PriorityType priority) {
71     auto stream_info = std::make_unique<StreamInfo>(
72         StreamInfo{std::move(priority), stream_id, false});
73     bool inserted =
74         stream_infos_.insert(std::make_pair(stream_id, std::move(stream_info)))
75             .second;
76     QUICHE_BUG_IF(spdy_bug_19_2, !inserted)
77         << "Stream " << stream_id << " already registered";
78   }
79 
80   // Unregisters the given stream from the scheduler, which will no longer keep
81   // state for it.
82   //
83   // Preconditions: `stream_id` should be registered.
UnregisterStream(StreamIdType stream_id)84   void UnregisterStream(StreamIdType stream_id) {
85     auto it = stream_infos_.find(stream_id);
86     if (it == stream_infos_.end()) {
87       QUICHE_BUG(spdy_bug_19_3) << "Stream " << stream_id << " not registered";
88       return;
89     }
90     const StreamInfo* const stream_info = it->second.get();
91     if (stream_info->ready) {
92       bool erased =
93           Erase(&priority_infos_[PriorityTypeToInt()(stream_info->priority)]
94                      .ready_list,
95                 stream_info);
96       QUICHE_DCHECK(erased);
97     }
98     stream_infos_.erase(it);
99   }
100 
101   // Returns true if the given stream is currently registered.
StreamRegistered(StreamIdType stream_id)102   bool StreamRegistered(StreamIdType stream_id) const {
103     return stream_infos_.find(stream_id) != stream_infos_.end();
104   }
105 
106   // Returns the priority of the specified stream.
107   //
108   // Preconditions: `stream_id` should be registered.
GetStreamPriority(StreamIdType stream_id)109   PriorityType GetStreamPriority(StreamIdType stream_id) const {
110     auto it = stream_infos_.find(stream_id);
111     if (it == stream_infos_.end()) {
112       QUICHE_DVLOG(1) << "Stream " << stream_id << " not registered";
113       return IntToPriorityType()(kLowestPriority);
114     }
115     return it->second->priority;
116   }
117 
118   // Updates the priority of the given stream.
119   //
120   // Preconditions: `stream_id` should be registered.
UpdateStreamPriority(StreamIdType stream_id,PriorityType priority)121   void UpdateStreamPriority(StreamIdType stream_id, PriorityType priority) {
122     auto it = stream_infos_.find(stream_id);
123     if (it == stream_infos_.end()) {
124       // TODO(mpw): add to stream_infos_ on demand--see b/15676312.
125       QUICHE_DVLOG(1) << "Stream " << stream_id << " not registered";
126       return;
127     }
128 
129     StreamInfo* const stream_info = it->second.get();
130     if (stream_info->priority == priority) {
131       return;
132     }
133 
134     // Only move `stream_info` to a different bucket if the integral priority
135     // value changes.
136     if (PriorityTypeToInt()(stream_info->priority) !=
137             PriorityTypeToInt()(priority) &&
138         stream_info->ready) {
139       bool erased =
140           Erase(&priority_infos_[PriorityTypeToInt()(stream_info->priority)]
141                      .ready_list,
142                 stream_info);
143       QUICHE_DCHECK(erased);
144       priority_infos_[PriorityTypeToInt()(priority)].ready_list.push_back(
145           stream_info);
146       ++num_ready_streams_;
147     }
148 
149     // But override `priority` for the stream regardless of the integral value,
150     // because it might contain additional information.
151     stream_info->priority = std::move(priority);
152   }
153 
154   // Records time of a read/write event for the given stream.
155   //
156   // Preconditions: `stream_id` should be registered.
RecordStreamEventTime(StreamIdType stream_id,absl::Time now)157   void RecordStreamEventTime(StreamIdType stream_id, absl::Time now) {
158     auto it = stream_infos_.find(stream_id);
159     if (it == stream_infos_.end()) {
160       QUICHE_BUG(spdy_bug_19_4) << "Stream " << stream_id << " not registered";
161       return;
162     }
163     PriorityInfo& priority_info =
164         priority_infos_[PriorityTypeToInt()(it->second->priority)];
165     priority_info.last_event_time =
166         std::max(priority_info.last_event_time, absl::make_optional(now));
167   }
168 
169   // Returns time of the last read/write event for a stream with higher priority
170   // than the priority of the given stream, or nullopt if there is no such
171   // event.
172   //
173   // Preconditions: `stream_id` should be registered.
GetLatestEventWithPriority(StreamIdType stream_id)174   std::optional<absl::Time> GetLatestEventWithPriority(
175       StreamIdType stream_id) const {
176     auto it = stream_infos_.find(stream_id);
177     if (it == stream_infos_.end()) {
178       QUICHE_BUG(spdy_bug_19_5) << "Stream " << stream_id << " not registered";
179       return std::nullopt;
180     }
181     std::optional<absl::Time> last_event_time;
182     const StreamInfo* const stream_info = it->second.get();
183     for (int p = kHighestPriority;
184          p < PriorityTypeToInt()(stream_info->priority); ++p) {
185       last_event_time =
186           std::max(last_event_time, priority_infos_[p].last_event_time);
187     }
188     return last_event_time;
189   }
190 
191   // If the scheduler has any ready streams, returns the next scheduled
192   // ready stream, in the process transitioning the stream from ready to not
193   // ready.
194   //
195   // Preconditions: `HasReadyStreams() == true`
PopNextReadyStream()196   StreamIdType PopNextReadyStream() {
197     return std::get<0>(PopNextReadyStreamAndPriority());
198   }
199 
200   // If the scheduler has any ready streams, returns the next scheduled
201   // ready stream and its priority, in the process transitioning the stream from
202   // ready to not ready.
203   //
204   // Preconditions: `HasReadyStreams() == true`
PopNextReadyStreamAndPriority()205   std::tuple<StreamIdType, PriorityType> PopNextReadyStreamAndPriority() {
206     for (int p = kHighestPriority; p <= kLowestPriority; ++p) {
207       ReadyList& ready_list = priority_infos_[p].ready_list;
208       if (!ready_list.empty()) {
209         StreamInfo* const info = ready_list.front();
210         ready_list.pop_front();
211         --num_ready_streams_;
212 
213         QUICHE_DCHECK(stream_infos_.find(info->stream_id) !=
214                       stream_infos_.end());
215         info->ready = false;
216         return std::make_tuple(info->stream_id, info->priority);
217       }
218     }
219     QUICHE_BUG(spdy_bug_19_6) << "No ready streams available";
220     return std::make_tuple(0, IntToPriorityType()(kLowestPriority));
221   }
222 
223   // Returns true if there's another stream ahead of the given stream in the
224   // scheduling queue.  This function can be called to see if the given stream
225   // should yield work to another stream.
226   //
227   // Preconditions: `stream_id` should be registered.
ShouldYield(StreamIdType stream_id)228   bool ShouldYield(StreamIdType stream_id) const {
229     auto it = stream_infos_.find(stream_id);
230     if (it == stream_infos_.end()) {
231       QUICHE_BUG(spdy_bug_19_7) << "Stream " << stream_id << " not registered";
232       return false;
233     }
234 
235     // If there's a higher priority stream, this stream should yield.
236     const StreamInfo* const stream_info = it->second.get();
237     for (int p = kHighestPriority;
238          p < PriorityTypeToInt()(stream_info->priority); ++p) {
239       if (!priority_infos_[p].ready_list.empty()) {
240         return true;
241       }
242     }
243 
244     // If this priority level is empty, or this stream is the next up, there's
245     // no need to yield.
246     const auto& ready_list =
247         priority_infos_[PriorityTypeToInt()(it->second->priority)].ready_list;
248     if (ready_list.empty() || ready_list.front()->stream_id == stream_id) {
249       return false;
250     }
251 
252     // There are other streams in this priority level which take precedence.
253     // Yield.
254     return true;
255   }
256 
257   // Marks the stream as ready to write. If the stream was already ready, does
258   // nothing. If add_to_front is true, the stream is scheduled ahead of other
259   // streams of the same priority/weight, otherwise it is scheduled behind them.
260   //
261   // Preconditions: `stream_id` should be registered.
MarkStreamReady(StreamIdType stream_id,bool add_to_front)262   void MarkStreamReady(StreamIdType stream_id, bool add_to_front) {
263     auto it = stream_infos_.find(stream_id);
264     if (it == stream_infos_.end()) {
265       QUICHE_BUG(spdy_bug_19_8) << "Stream " << stream_id << " not registered";
266       return;
267     }
268     StreamInfo* const stream_info = it->second.get();
269     if (stream_info->ready) {
270       return;
271     }
272     ReadyList& ready_list =
273         priority_infos_[PriorityTypeToInt()(stream_info->priority)].ready_list;
274     if (add_to_front) {
275       ready_list.push_front(stream_info);
276     } else {
277       ready_list.push_back(stream_info);
278     }
279     ++num_ready_streams_;
280     stream_info->ready = true;
281   }
282 
283   // Marks the stream as not ready to write. If the stream is not registered or
284   // not ready, does nothing.
285   //
286   // Preconditions: `stream_id` should be registered.
MarkStreamNotReady(StreamIdType stream_id)287   void MarkStreamNotReady(StreamIdType stream_id) {
288     auto it = stream_infos_.find(stream_id);
289     if (it == stream_infos_.end()) {
290       QUICHE_BUG(spdy_bug_19_9) << "Stream " << stream_id << " not registered";
291       return;
292     }
293     StreamInfo* const stream_info = it->second.get();
294     if (!stream_info->ready) {
295       return;
296     }
297     bool erased = Erase(
298         &priority_infos_[PriorityTypeToInt()(stream_info->priority)].ready_list,
299         stream_info);
300     QUICHE_DCHECK(erased);
301     stream_info->ready = false;
302   }
303 
304   // Returns true iff the scheduler has any ready streams.
HasReadyStreams()305   bool HasReadyStreams() const { return num_ready_streams_ > 0; }
306 
307   // Returns the number of streams currently marked ready.
NumReadyStreams()308   size_t NumReadyStreams() const { return num_ready_streams_; }
309 
310   // Returns the number of registered streams.
NumRegisteredStreams()311   size_t NumRegisteredStreams() const { return stream_infos_.size(); }
312 
313   // Returns summary of internal state, for logging/debugging.
DebugString()314   std::string DebugString() const {
315     return absl::StrCat(
316         "PriorityWriteScheduler {num_streams=", stream_infos_.size(),
317         " num_ready_streams=", NumReadyStreams(), "}");
318   }
319 
320   // Returns true if stream with `stream_id` is ready.
IsStreamReady(StreamIdType stream_id)321   bool IsStreamReady(StreamIdType stream_id) const {
322     auto it = stream_infos_.find(stream_id);
323     if (it == stream_infos_.end()) {
324       QUICHE_DLOG(INFO) << "Stream " << stream_id << " not registered";
325       return false;
326     }
327     return it->second->ready;
328   }
329 
330  private:
331   friend class test::PriorityWriteSchedulerPeer<StreamIdType>;
332 
333   // State kept for all registered streams.
334   // All ready streams have `ready == true` and should be present in
335   // `priority_infos_[priority].ready_list`.
336   struct QUICHE_EXPORT StreamInfo {
337     PriorityType priority;
338     StreamIdType stream_id;
339     bool ready;
340   };
341 
342   // O(1) size lookup, O(1) insert at front or back (amortized).
343   using ReadyList = quiche::QuicheCircularDeque<StreamInfo*>;
344 
345   // State kept for each priority level.
346   struct QUICHE_EXPORT PriorityInfo {
347     // IDs of streams that are ready to write.
348     ReadyList ready_list;
349     // Time of latest write event for stream of this priority.
350     std::optional<absl::Time> last_event_time;
351   };
352 
353   // Use std::unique_ptr, because absl::flat_hash_map does not have pointer
354   // stability, but ReadyList stores pointers to the StreamInfo objects.
355   using StreamInfoMap =
356       absl::flat_hash_map<StreamIdType, std::unique_ptr<StreamInfo>>;
357 
358   // Erases `info` from `ready_list`, returning true if found (and erased), or
359   // false otherwise. Decrements `num_ready_streams_` if an entry is erased.
Erase(ReadyList * ready_list,const StreamInfo * info)360   bool Erase(ReadyList* ready_list, const StreamInfo* info) {
361     auto it = std::remove(ready_list->begin(), ready_list->end(), info);
362     if (it == ready_list->end()) {
363       // `info` was not found.
364       return false;
365     }
366     ready_list->pop_back();
367     --num_ready_streams_;
368     return true;
369   }
370 
371   // Number of ready streams.
372   size_t num_ready_streams_ = 0;
373   // Per-priority state, including ready lists.
374   PriorityInfo priority_infos_[kLowestPriority + 1];
375   // StreamInfos for all registered streams.
376   StreamInfoMap stream_infos_;
377 };
378 
379 }  // namespace http2
380 
381 #endif  // QUICHE_HTTP2_CORE_PRIORITY_WRITE_SCHEDULER_H_
382