1 // Copyright (c) 2015 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef QUICHE_HTTP2_CORE_PRIORITY_WRITE_SCHEDULER_H_ 6 #define QUICHE_HTTP2_CORE_PRIORITY_WRITE_SCHEDULER_H_ 7 8 #include <algorithm> 9 #include <cstddef> 10 #include <cstdint> 11 #include <memory> 12 #include <optional> 13 #include <string> 14 #include <tuple> 15 #include <utility> 16 #include <vector> 17 18 #include "absl/container/flat_hash_map.h" 19 #include "absl/strings/str_cat.h" 20 #include "absl/time/time.h" 21 #include "quiche/common/platform/api/quiche_bug_tracker.h" 22 #include "quiche/common/platform/api/quiche_export.h" 23 #include "quiche/common/platform/api/quiche_logging.h" 24 #include "quiche/common/quiche_circular_deque.h" 25 #include "quiche/spdy/core/spdy_protocol.h" 26 27 namespace http2 { 28 29 namespace test { 30 template <typename StreamIdType> 31 class PriorityWriteSchedulerPeer; 32 } 33 34 // SpdyPriority is an integer type, so this functor can be used both as 35 // PriorityTypeToInt and as IntToPriorityType. 36 struct QUICHE_EXPORT SpdyPriorityToSpdyPriority { operatorSpdyPriorityToSpdyPriority37 spdy::SpdyPriority operator()(spdy::SpdyPriority priority) { 38 return priority; 39 } 40 }; 41 42 // PriorityWriteScheduler manages the order in which HTTP/2 or HTTP/3 streams 43 // are written. Each stream has a priority of type PriorityType. This includes 44 // an integer between 0 and 7, and optionally other information that is stored 45 // but otherwise ignored by this class. Higher priority (lower integer value) 46 // streams are always given precedence over lower priority (higher value) 47 // streams, as long as the higher priority stream is not blocked. 48 // 49 // Each stream can be in one of two states: ready or not ready (for writing). 50 // Ready state is changed by calling the MarkStreamReady() and 51 // MarkStreamNotReady() methods. Only streams in the ready state can be returned 52 // by PopNextReadyStream(). When returned by that method, the stream's state 53 // changes to not ready. 54 // 55 template <typename StreamIdType, typename PriorityType = spdy::SpdyPriority, 56 typename PriorityTypeToInt = SpdyPriorityToSpdyPriority, 57 typename IntToPriorityType = SpdyPriorityToSpdyPriority> 58 class QUICHE_EXPORT PriorityWriteScheduler { 59 public: 60 static constexpr int kHighestPriority = 0; 61 static constexpr int kLowestPriority = 7; 62 63 static_assert(spdy::kV3HighestPriority == kHighestPriority); 64 static_assert(spdy::kV3LowestPriority == kLowestPriority); 65 66 // Registers new stream `stream_id` with the scheduler, assigning it the 67 // given priority. 68 // 69 // Preconditions: `stream_id` should be unregistered. RegisterStream(StreamIdType stream_id,PriorityType priority)70 void RegisterStream(StreamIdType stream_id, PriorityType priority) { 71 auto stream_info = std::make_unique<StreamInfo>( 72 StreamInfo{std::move(priority), stream_id, false}); 73 bool inserted = 74 stream_infos_.insert(std::make_pair(stream_id, std::move(stream_info))) 75 .second; 76 QUICHE_BUG_IF(spdy_bug_19_2, !inserted) 77 << "Stream " << stream_id << " already registered"; 78 } 79 80 // Unregisters the given stream from the scheduler, which will no longer keep 81 // state for it. 82 // 83 // Preconditions: `stream_id` should be registered. UnregisterStream(StreamIdType stream_id)84 void UnregisterStream(StreamIdType stream_id) { 85 auto it = stream_infos_.find(stream_id); 86 if (it == stream_infos_.end()) { 87 QUICHE_BUG(spdy_bug_19_3) << "Stream " << stream_id << " not registered"; 88 return; 89 } 90 const StreamInfo* const stream_info = it->second.get(); 91 if (stream_info->ready) { 92 bool erased = 93 Erase(&priority_infos_[PriorityTypeToInt()(stream_info->priority)] 94 .ready_list, 95 stream_info); 96 QUICHE_DCHECK(erased); 97 } 98 stream_infos_.erase(it); 99 } 100 101 // Returns true if the given stream is currently registered. StreamRegistered(StreamIdType stream_id)102 bool StreamRegistered(StreamIdType stream_id) const { 103 return stream_infos_.find(stream_id) != stream_infos_.end(); 104 } 105 106 // Returns the priority of the specified stream. 107 // 108 // Preconditions: `stream_id` should be registered. GetStreamPriority(StreamIdType stream_id)109 PriorityType GetStreamPriority(StreamIdType stream_id) const { 110 auto it = stream_infos_.find(stream_id); 111 if (it == stream_infos_.end()) { 112 QUICHE_DVLOG(1) << "Stream " << stream_id << " not registered"; 113 return IntToPriorityType()(kLowestPriority); 114 } 115 return it->second->priority; 116 } 117 118 // Updates the priority of the given stream. 119 // 120 // Preconditions: `stream_id` should be registered. UpdateStreamPriority(StreamIdType stream_id,PriorityType priority)121 void UpdateStreamPriority(StreamIdType stream_id, PriorityType priority) { 122 auto it = stream_infos_.find(stream_id); 123 if (it == stream_infos_.end()) { 124 // TODO(mpw): add to stream_infos_ on demand--see b/15676312. 125 QUICHE_DVLOG(1) << "Stream " << stream_id << " not registered"; 126 return; 127 } 128 129 StreamInfo* const stream_info = it->second.get(); 130 if (stream_info->priority == priority) { 131 return; 132 } 133 134 // Only move `stream_info` to a different bucket if the integral priority 135 // value changes. 136 if (PriorityTypeToInt()(stream_info->priority) != 137 PriorityTypeToInt()(priority) && 138 stream_info->ready) { 139 bool erased = 140 Erase(&priority_infos_[PriorityTypeToInt()(stream_info->priority)] 141 .ready_list, 142 stream_info); 143 QUICHE_DCHECK(erased); 144 priority_infos_[PriorityTypeToInt()(priority)].ready_list.push_back( 145 stream_info); 146 ++num_ready_streams_; 147 } 148 149 // But override `priority` for the stream regardless of the integral value, 150 // because it might contain additional information. 151 stream_info->priority = std::move(priority); 152 } 153 154 // Records time of a read/write event for the given stream. 155 // 156 // Preconditions: `stream_id` should be registered. RecordStreamEventTime(StreamIdType stream_id,absl::Time now)157 void RecordStreamEventTime(StreamIdType stream_id, absl::Time now) { 158 auto it = stream_infos_.find(stream_id); 159 if (it == stream_infos_.end()) { 160 QUICHE_BUG(spdy_bug_19_4) << "Stream " << stream_id << " not registered"; 161 return; 162 } 163 PriorityInfo& priority_info = 164 priority_infos_[PriorityTypeToInt()(it->second->priority)]; 165 priority_info.last_event_time = 166 std::max(priority_info.last_event_time, absl::make_optional(now)); 167 } 168 169 // Returns time of the last read/write event for a stream with higher priority 170 // than the priority of the given stream, or nullopt if there is no such 171 // event. 172 // 173 // Preconditions: `stream_id` should be registered. GetLatestEventWithPriority(StreamIdType stream_id)174 std::optional<absl::Time> GetLatestEventWithPriority( 175 StreamIdType stream_id) const { 176 auto it = stream_infos_.find(stream_id); 177 if (it == stream_infos_.end()) { 178 QUICHE_BUG(spdy_bug_19_5) << "Stream " << stream_id << " not registered"; 179 return std::nullopt; 180 } 181 std::optional<absl::Time> last_event_time; 182 const StreamInfo* const stream_info = it->second.get(); 183 for (int p = kHighestPriority; 184 p < PriorityTypeToInt()(stream_info->priority); ++p) { 185 last_event_time = 186 std::max(last_event_time, priority_infos_[p].last_event_time); 187 } 188 return last_event_time; 189 } 190 191 // If the scheduler has any ready streams, returns the next scheduled 192 // ready stream, in the process transitioning the stream from ready to not 193 // ready. 194 // 195 // Preconditions: `HasReadyStreams() == true` PopNextReadyStream()196 StreamIdType PopNextReadyStream() { 197 return std::get<0>(PopNextReadyStreamAndPriority()); 198 } 199 200 // If the scheduler has any ready streams, returns the next scheduled 201 // ready stream and its priority, in the process transitioning the stream from 202 // ready to not ready. 203 // 204 // Preconditions: `HasReadyStreams() == true` PopNextReadyStreamAndPriority()205 std::tuple<StreamIdType, PriorityType> PopNextReadyStreamAndPriority() { 206 for (int p = kHighestPriority; p <= kLowestPriority; ++p) { 207 ReadyList& ready_list = priority_infos_[p].ready_list; 208 if (!ready_list.empty()) { 209 StreamInfo* const info = ready_list.front(); 210 ready_list.pop_front(); 211 --num_ready_streams_; 212 213 QUICHE_DCHECK(stream_infos_.find(info->stream_id) != 214 stream_infos_.end()); 215 info->ready = false; 216 return std::make_tuple(info->stream_id, info->priority); 217 } 218 } 219 QUICHE_BUG(spdy_bug_19_6) << "No ready streams available"; 220 return std::make_tuple(0, IntToPriorityType()(kLowestPriority)); 221 } 222 223 // Returns true if there's another stream ahead of the given stream in the 224 // scheduling queue. This function can be called to see if the given stream 225 // should yield work to another stream. 226 // 227 // Preconditions: `stream_id` should be registered. ShouldYield(StreamIdType stream_id)228 bool ShouldYield(StreamIdType stream_id) const { 229 auto it = stream_infos_.find(stream_id); 230 if (it == stream_infos_.end()) { 231 QUICHE_BUG(spdy_bug_19_7) << "Stream " << stream_id << " not registered"; 232 return false; 233 } 234 235 // If there's a higher priority stream, this stream should yield. 236 const StreamInfo* const stream_info = it->second.get(); 237 for (int p = kHighestPriority; 238 p < PriorityTypeToInt()(stream_info->priority); ++p) { 239 if (!priority_infos_[p].ready_list.empty()) { 240 return true; 241 } 242 } 243 244 // If this priority level is empty, or this stream is the next up, there's 245 // no need to yield. 246 const auto& ready_list = 247 priority_infos_[PriorityTypeToInt()(it->second->priority)].ready_list; 248 if (ready_list.empty() || ready_list.front()->stream_id == stream_id) { 249 return false; 250 } 251 252 // There are other streams in this priority level which take precedence. 253 // Yield. 254 return true; 255 } 256 257 // Marks the stream as ready to write. If the stream was already ready, does 258 // nothing. If add_to_front is true, the stream is scheduled ahead of other 259 // streams of the same priority/weight, otherwise it is scheduled behind them. 260 // 261 // Preconditions: `stream_id` should be registered. MarkStreamReady(StreamIdType stream_id,bool add_to_front)262 void MarkStreamReady(StreamIdType stream_id, bool add_to_front) { 263 auto it = stream_infos_.find(stream_id); 264 if (it == stream_infos_.end()) { 265 QUICHE_BUG(spdy_bug_19_8) << "Stream " << stream_id << " not registered"; 266 return; 267 } 268 StreamInfo* const stream_info = it->second.get(); 269 if (stream_info->ready) { 270 return; 271 } 272 ReadyList& ready_list = 273 priority_infos_[PriorityTypeToInt()(stream_info->priority)].ready_list; 274 if (add_to_front) { 275 ready_list.push_front(stream_info); 276 } else { 277 ready_list.push_back(stream_info); 278 } 279 ++num_ready_streams_; 280 stream_info->ready = true; 281 } 282 283 // Marks the stream as not ready to write. If the stream is not registered or 284 // not ready, does nothing. 285 // 286 // Preconditions: `stream_id` should be registered. MarkStreamNotReady(StreamIdType stream_id)287 void MarkStreamNotReady(StreamIdType stream_id) { 288 auto it = stream_infos_.find(stream_id); 289 if (it == stream_infos_.end()) { 290 QUICHE_BUG(spdy_bug_19_9) << "Stream " << stream_id << " not registered"; 291 return; 292 } 293 StreamInfo* const stream_info = it->second.get(); 294 if (!stream_info->ready) { 295 return; 296 } 297 bool erased = Erase( 298 &priority_infos_[PriorityTypeToInt()(stream_info->priority)].ready_list, 299 stream_info); 300 QUICHE_DCHECK(erased); 301 stream_info->ready = false; 302 } 303 304 // Returns true iff the scheduler has any ready streams. HasReadyStreams()305 bool HasReadyStreams() const { return num_ready_streams_ > 0; } 306 307 // Returns the number of streams currently marked ready. NumReadyStreams()308 size_t NumReadyStreams() const { return num_ready_streams_; } 309 310 // Returns the number of registered streams. NumRegisteredStreams()311 size_t NumRegisteredStreams() const { return stream_infos_.size(); } 312 313 // Returns summary of internal state, for logging/debugging. DebugString()314 std::string DebugString() const { 315 return absl::StrCat( 316 "PriorityWriteScheduler {num_streams=", stream_infos_.size(), 317 " num_ready_streams=", NumReadyStreams(), "}"); 318 } 319 320 // Returns true if stream with `stream_id` is ready. IsStreamReady(StreamIdType stream_id)321 bool IsStreamReady(StreamIdType stream_id) const { 322 auto it = stream_infos_.find(stream_id); 323 if (it == stream_infos_.end()) { 324 QUICHE_DLOG(INFO) << "Stream " << stream_id << " not registered"; 325 return false; 326 } 327 return it->second->ready; 328 } 329 330 private: 331 friend class test::PriorityWriteSchedulerPeer<StreamIdType>; 332 333 // State kept for all registered streams. 334 // All ready streams have `ready == true` and should be present in 335 // `priority_infos_[priority].ready_list`. 336 struct QUICHE_EXPORT StreamInfo { 337 PriorityType priority; 338 StreamIdType stream_id; 339 bool ready; 340 }; 341 342 // O(1) size lookup, O(1) insert at front or back (amortized). 343 using ReadyList = quiche::QuicheCircularDeque<StreamInfo*>; 344 345 // State kept for each priority level. 346 struct QUICHE_EXPORT PriorityInfo { 347 // IDs of streams that are ready to write. 348 ReadyList ready_list; 349 // Time of latest write event for stream of this priority. 350 std::optional<absl::Time> last_event_time; 351 }; 352 353 // Use std::unique_ptr, because absl::flat_hash_map does not have pointer 354 // stability, but ReadyList stores pointers to the StreamInfo objects. 355 using StreamInfoMap = 356 absl::flat_hash_map<StreamIdType, std::unique_ptr<StreamInfo>>; 357 358 // Erases `info` from `ready_list`, returning true if found (and erased), or 359 // false otherwise. Decrements `num_ready_streams_` if an entry is erased. Erase(ReadyList * ready_list,const StreamInfo * info)360 bool Erase(ReadyList* ready_list, const StreamInfo* info) { 361 auto it = std::remove(ready_list->begin(), ready_list->end(), info); 362 if (it == ready_list->end()) { 363 // `info` was not found. 364 return false; 365 } 366 ready_list->pop_back(); 367 --num_ready_streams_; 368 return true; 369 } 370 371 // Number of ready streams. 372 size_t num_ready_streams_ = 0; 373 // Per-priority state, including ready lists. 374 PriorityInfo priority_infos_[kLowestPriority + 1]; 375 // StreamInfos for all registered streams. 376 StreamInfoMap stream_infos_; 377 }; 378 379 } // namespace http2 380 381 #endif // QUICHE_HTTP2_CORE_PRIORITY_WRITE_SCHEDULER_H_ 382