1 // Copyright 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/web_transport/web_transport_priority_scheduler.h"
6
7 #include <optional>
8 #include <utility>
9
10 #include "absl/cleanup/cleanup.h"
11 #include "absl/status/status.h"
12 #include "absl/status/statusor.h"
13 #include "quiche/common/quiche_status_utils.h"
14 #include "quiche/web_transport/web_transport.h"
15
16 namespace webtransport {
17
Register(StreamId stream_id,const StreamPriority & priority)18 absl::Status PriorityScheduler::Register(StreamId stream_id,
19 const StreamPriority& priority) {
20 auto [it, success] = stream_to_group_map_.insert({stream_id, nullptr});
21 if (!success) {
22 return absl::AlreadyExistsError("Provided stream ID already registered");
23 }
24 // Avoid having any nullptr entries in the stream map if we error out further
25 // down below. This should not happen (all errors below are logical errors),
26 // but if that does happen, we will avoid crashing due to nullptr dereference.
27 auto cleanup_nullptr_map_entry =
28 absl::MakeCleanup([&] { stream_to_group_map_.erase(stream_id); });
29
30 auto [scheduler_it, scheduler_created] =
31 per_group_schedulers_.try_emplace(priority.send_group_id);
32 if (scheduler_created) {
33 // First element in the associated group; register the group in question.
34 QUICHE_RETURN_IF_ERROR(active_groups_.Register(priority.send_group_id, {}));
35 }
36
37 PerGroupScheduler& scheduler = scheduler_it->second;
38 QUICHE_RETURN_IF_ERROR(scheduler.Register(stream_id, priority.send_order));
39
40 it->second = &*scheduler_it;
41 std::move(cleanup_nullptr_map_entry).Cancel();
42 return absl::OkStatus();
43 }
44
Unregister(StreamId stream_id)45 absl::Status PriorityScheduler::Unregister(StreamId stream_id) {
46 auto it = stream_to_group_map_.find(stream_id);
47 if (it == stream_to_group_map_.end()) {
48 return absl::NotFoundError("Stream ID not registered");
49 }
50 SendGroupId group_id = it->second->first;
51 PerGroupScheduler* group_scheduler = &it->second->second;
52 stream_to_group_map_.erase(it);
53
54 QUICHE_RETURN_IF_ERROR(group_scheduler->Unregister(stream_id));
55 // Clean up the group if there are no more streams associated with it.
56 if (!group_scheduler->HasRegistered()) {
57 per_group_schedulers_.erase(group_id);
58 QUICHE_RETURN_IF_ERROR(active_groups_.Unregister(group_id));
59 }
60 return absl::OkStatus();
61 }
62
UpdateSendOrder(StreamId stream_id,SendOrder new_send_order)63 absl::Status PriorityScheduler::UpdateSendOrder(StreamId stream_id,
64 SendOrder new_send_order) {
65 PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
66 if (scheduler == nullptr) {
67 return absl::NotFoundError("Stream ID not registered");
68 }
69 return scheduler->UpdatePriority(stream_id, new_send_order);
70 }
71
UpdateSendGroup(StreamId stream_id,SendGroupId new_send_group)72 absl::Status PriorityScheduler::UpdateSendGroup(StreamId stream_id,
73 SendGroupId new_send_group) {
74 PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
75 if (scheduler == nullptr) {
76 return absl::NotFoundError("Stream ID not registered");
77 }
78 bool is_scheduled = scheduler->IsScheduled(stream_id);
79 std::optional<SendOrder> send_order = scheduler->GetPriorityFor(stream_id);
80 if (!send_order.has_value()) {
81 return absl::InternalError(
82 "Stream registered at the top level scheduler, but not at the "
83 "per-group one");
84 }
85 QUICHE_RETURN_IF_ERROR(Unregister(stream_id));
86 QUICHE_RETURN_IF_ERROR(
87 Register(stream_id, StreamPriority{new_send_group, *send_order}));
88 if (is_scheduled) {
89 QUICHE_RETURN_IF_ERROR(Schedule(stream_id));
90 }
91 return absl::OkStatus();
92 }
93
GetPriorityFor(StreamId stream_id) const94 std::optional<StreamPriority> PriorityScheduler::GetPriorityFor(
95 StreamId stream_id) const {
96 auto it = stream_to_group_map_.find(stream_id);
97 if (it == stream_to_group_map_.end()) {
98 return std::nullopt;
99 }
100 const auto& [group_id, group_scheduler] = *it->second;
101 std::optional<SendOrder> send_order =
102 group_scheduler.GetPriorityFor(stream_id);
103 if (!send_order.has_value()) {
104 return std::nullopt;
105 }
106 return StreamPriority{group_id, *send_order};
107 }
108
ShouldYield(StreamId stream_id) const109 absl::StatusOr<bool> PriorityScheduler::ShouldYield(StreamId stream_id) const {
110 auto it = stream_to_group_map_.find(stream_id);
111 if (it == stream_to_group_map_.end()) {
112 return absl::NotFoundError("Stream ID not registered");
113 }
114 const auto& [group_id, group_scheduler] = *it->second;
115
116 absl::StatusOr<bool> per_group_result = active_groups_.ShouldYield(group_id);
117 QUICHE_RETURN_IF_ERROR(per_group_result.status());
118 if (*per_group_result) {
119 return true;
120 }
121
122 return group_scheduler.ShouldYield(stream_id);
123 }
124
PopFront()125 absl::StatusOr<StreamId> PriorityScheduler::PopFront() {
126 absl::StatusOr<SendGroupId> group_id = active_groups_.PopFront();
127 QUICHE_RETURN_IF_ERROR(group_id.status());
128
129 auto it = per_group_schedulers_.find(*group_id);
130 if (it == per_group_schedulers_.end()) {
131 return absl::InternalError(
132 "Scheduled a group with no per-group scheduler attached");
133 }
134 PerGroupScheduler& scheduler = it->second;
135 absl::StatusOr<StreamId> result = scheduler.PopFront();
136 if (!result.ok()) {
137 return absl::InternalError("Inactive group found in top-level schedule");
138 }
139
140 // Reschedule the group if it has more active streams in it.
141 if (scheduler.HasScheduled()) {
142 QUICHE_RETURN_IF_ERROR(active_groups_.Schedule(*group_id));
143 }
144
145 return result;
146 }
147
Schedule(StreamId stream_id)148 absl::Status PriorityScheduler::Schedule(StreamId stream_id) {
149 auto it = stream_to_group_map_.find(stream_id);
150 if (it == stream_to_group_map_.end()) {
151 return absl::NotFoundError("Stream ID not registered");
152 }
153 auto& [group_id, group_scheduler] = *it->second;
154 QUICHE_RETURN_IF_ERROR(active_groups_.Schedule(group_id));
155 return group_scheduler.Schedule(stream_id);
156 }
157
IsScheduled(StreamId stream_id) const158 bool PriorityScheduler::IsScheduled(StreamId stream_id) const {
159 const PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
160 if (scheduler == nullptr) {
161 return false;
162 }
163 return scheduler->IsScheduled(stream_id);
164 }
165
166 } // namespace webtransport
167