1 // Copyright 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/web_transport/web_transport_priority_scheduler.h"
6 
7 #include <optional>
8 #include <utility>
9 
10 #include "absl/cleanup/cleanup.h"
11 #include "absl/status/status.h"
12 #include "absl/status/statusor.h"
13 #include "quiche/common/quiche_status_utils.h"
14 #include "quiche/web_transport/web_transport.h"
15 
16 namespace webtransport {
17 
Register(StreamId stream_id,const StreamPriority & priority)18 absl::Status PriorityScheduler::Register(StreamId stream_id,
19                                          const StreamPriority& priority) {
20   auto [it, success] = stream_to_group_map_.insert({stream_id, nullptr});
21   if (!success) {
22     return absl::AlreadyExistsError("Provided stream ID already registered");
23   }
24   // Avoid having any nullptr entries in the stream map if we error out further
25   // down below. This should not happen (all errors below are logical errors),
26   // but if that does happen, we will avoid crashing due to nullptr dereference.
27   auto cleanup_nullptr_map_entry =
28       absl::MakeCleanup([&] { stream_to_group_map_.erase(stream_id); });
29 
30   auto [scheduler_it, scheduler_created] =
31       per_group_schedulers_.try_emplace(priority.send_group_id);
32   if (scheduler_created) {
33     // First element in the associated group; register the group in question.
34     QUICHE_RETURN_IF_ERROR(active_groups_.Register(priority.send_group_id, {}));
35   }
36 
37   PerGroupScheduler& scheduler = scheduler_it->second;
38   QUICHE_RETURN_IF_ERROR(scheduler.Register(stream_id, priority.send_order));
39 
40   it->second = &*scheduler_it;
41   std::move(cleanup_nullptr_map_entry).Cancel();
42   return absl::OkStatus();
43 }
44 
Unregister(StreamId stream_id)45 absl::Status PriorityScheduler::Unregister(StreamId stream_id) {
46   auto it = stream_to_group_map_.find(stream_id);
47   if (it == stream_to_group_map_.end()) {
48     return absl::NotFoundError("Stream ID not registered");
49   }
50   SendGroupId group_id = it->second->first;
51   PerGroupScheduler* group_scheduler = &it->second->second;
52   stream_to_group_map_.erase(it);
53 
54   QUICHE_RETURN_IF_ERROR(group_scheduler->Unregister(stream_id));
55   // Clean up the group if there are no more streams associated with it.
56   if (!group_scheduler->HasRegistered()) {
57     per_group_schedulers_.erase(group_id);
58     QUICHE_RETURN_IF_ERROR(active_groups_.Unregister(group_id));
59   }
60   return absl::OkStatus();
61 }
62 
UpdateSendOrder(StreamId stream_id,SendOrder new_send_order)63 absl::Status PriorityScheduler::UpdateSendOrder(StreamId stream_id,
64                                                 SendOrder new_send_order) {
65   PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
66   if (scheduler == nullptr) {
67     return absl::NotFoundError("Stream ID not registered");
68   }
69   return scheduler->UpdatePriority(stream_id, new_send_order);
70 }
71 
UpdateSendGroup(StreamId stream_id,SendGroupId new_send_group)72 absl::Status PriorityScheduler::UpdateSendGroup(StreamId stream_id,
73                                                 SendGroupId new_send_group) {
74   PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
75   if (scheduler == nullptr) {
76     return absl::NotFoundError("Stream ID not registered");
77   }
78   bool is_scheduled = scheduler->IsScheduled(stream_id);
79   std::optional<SendOrder> send_order = scheduler->GetPriorityFor(stream_id);
80   if (!send_order.has_value()) {
81     return absl::InternalError(
82         "Stream registered at the top level scheduler, but not at the "
83         "per-group one");
84   }
85   QUICHE_RETURN_IF_ERROR(Unregister(stream_id));
86   QUICHE_RETURN_IF_ERROR(
87       Register(stream_id, StreamPriority{new_send_group, *send_order}));
88   if (is_scheduled) {
89     QUICHE_RETURN_IF_ERROR(Schedule(stream_id));
90   }
91   return absl::OkStatus();
92 }
93 
GetPriorityFor(StreamId stream_id) const94 std::optional<StreamPriority> PriorityScheduler::GetPriorityFor(
95     StreamId stream_id) const {
96   auto it = stream_to_group_map_.find(stream_id);
97   if (it == stream_to_group_map_.end()) {
98     return std::nullopt;
99   }
100   const auto& [group_id, group_scheduler] = *it->second;
101   std::optional<SendOrder> send_order =
102       group_scheduler.GetPriorityFor(stream_id);
103   if (!send_order.has_value()) {
104     return std::nullopt;
105   }
106   return StreamPriority{group_id, *send_order};
107 }
108 
ShouldYield(StreamId stream_id) const109 absl::StatusOr<bool> PriorityScheduler::ShouldYield(StreamId stream_id) const {
110   auto it = stream_to_group_map_.find(stream_id);
111   if (it == stream_to_group_map_.end()) {
112     return absl::NotFoundError("Stream ID not registered");
113   }
114   const auto& [group_id, group_scheduler] = *it->second;
115 
116   absl::StatusOr<bool> per_group_result = active_groups_.ShouldYield(group_id);
117   QUICHE_RETURN_IF_ERROR(per_group_result.status());
118   if (*per_group_result) {
119     return true;
120   }
121 
122   return group_scheduler.ShouldYield(stream_id);
123 }
124 
PopFront()125 absl::StatusOr<StreamId> PriorityScheduler::PopFront() {
126   absl::StatusOr<SendGroupId> group_id = active_groups_.PopFront();
127   QUICHE_RETURN_IF_ERROR(group_id.status());
128 
129   auto it = per_group_schedulers_.find(*group_id);
130   if (it == per_group_schedulers_.end()) {
131     return absl::InternalError(
132         "Scheduled a group with no per-group scheduler attached");
133   }
134   PerGroupScheduler& scheduler = it->second;
135   absl::StatusOr<StreamId> result = scheduler.PopFront();
136   if (!result.ok()) {
137     return absl::InternalError("Inactive group found in top-level schedule");
138   }
139 
140   // Reschedule the group if it has more active streams in it.
141   if (scheduler.HasScheduled()) {
142     QUICHE_RETURN_IF_ERROR(active_groups_.Schedule(*group_id));
143   }
144 
145   return result;
146 }
147 
Schedule(StreamId stream_id)148 absl::Status PriorityScheduler::Schedule(StreamId stream_id) {
149   auto it = stream_to_group_map_.find(stream_id);
150   if (it == stream_to_group_map_.end()) {
151     return absl::NotFoundError("Stream ID not registered");
152   }
153   auto& [group_id, group_scheduler] = *it->second;
154   QUICHE_RETURN_IF_ERROR(active_groups_.Schedule(group_id));
155   return group_scheduler.Schedule(stream_id);
156 }
157 
IsScheduled(StreamId stream_id) const158 bool PriorityScheduler::IsScheduled(StreamId stream_id) const {
159   const PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
160   if (scheduler == nullptr) {
161     return false;
162   }
163   return scheduler->IsScheduled(stream_id);
164 }
165 
166 }  // namespace webtransport
167