1 /*
2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/pacing/task_queue_paced_sender.h"
12
13 #include <algorithm>
14 #include <utility>
15
16 #include "absl/memory/memory.h"
17 #include "api/task_queue/pending_task_safety_flag.h"
18 #include "api/transport/network_types.h"
19 #include "rtc_base/checks.h"
20 #include "rtc_base/experiments/field_trial_parser.h"
21 #include "rtc_base/experiments/field_trial_units.h"
22 #include "rtc_base/system/unused.h"
23 #include "rtc_base/trace_event.h"
24
25 namespace webrtc {
26
27 namespace {
28
29 constexpr const char* kBurstyPacerFieldTrial = "WebRTC-BurstyPacer";
30
31 constexpr const char* kSlackedTaskQueuePacedSenderFieldTrial =
32 "WebRTC-SlackedTaskQueuePacedSender";
33
34 } // namespace
35
36 const int TaskQueuePacedSender::kNoPacketHoldback = -1;
37
BurstyPacerFlags(const FieldTrialsView & field_trials)38 TaskQueuePacedSender::BurstyPacerFlags::BurstyPacerFlags(
39 const FieldTrialsView& field_trials)
40 : burst("burst") {
41 ParseFieldTrial({&burst}, field_trials.Lookup(kBurstyPacerFieldTrial));
42 }
43
SlackedPacerFlags(const FieldTrialsView & field_trials)44 TaskQueuePacedSender::SlackedPacerFlags::SlackedPacerFlags(
45 const FieldTrialsView& field_trials)
46 : allow_low_precision("Enabled"),
47 max_low_precision_expected_queue_time("max_queue_time"),
48 send_burst_interval("send_burst_interval") {
49 ParseFieldTrial({&allow_low_precision, &max_low_precision_expected_queue_time,
50 &send_burst_interval},
51 field_trials.Lookup(kSlackedTaskQueuePacedSenderFieldTrial));
52 }
53
TaskQueuePacedSender(Clock * clock,PacingController::PacketSender * packet_sender,const FieldTrialsView & field_trials,TaskQueueFactory * task_queue_factory,TimeDelta max_hold_back_window,int max_hold_back_window_in_packets,absl::optional<TimeDelta> burst_interval)54 TaskQueuePacedSender::TaskQueuePacedSender(
55 Clock* clock,
56 PacingController::PacketSender* packet_sender,
57 const FieldTrialsView& field_trials,
58 TaskQueueFactory* task_queue_factory,
59 TimeDelta max_hold_back_window,
60 int max_hold_back_window_in_packets,
61 absl::optional<TimeDelta> burst_interval)
62 : clock_(clock),
63 bursty_pacer_flags_(field_trials),
64 slacked_pacer_flags_(field_trials),
65 max_hold_back_window_(slacked_pacer_flags_.allow_low_precision
66 ? PacingController::kMinSleepTime
67 : max_hold_back_window),
68 max_hold_back_window_in_packets_(slacked_pacer_flags_.allow_low_precision
69 ? 0
70 : max_hold_back_window_in_packets),
71 pacing_controller_(clock, packet_sender, field_trials),
72 next_process_time_(Timestamp::MinusInfinity()),
73 is_started_(false),
74 is_shutdown_(false),
75 packet_size_(/*alpha=*/0.95),
76 include_overhead_(false),
77 task_queue_(field_trials, "TaskQueuePacedSender", task_queue_factory) {
78 RTC_DCHECK_GE(max_hold_back_window_, PacingController::kMinSleepTime);
79 // There are multiple field trials that can affect burst. If multiple bursts
80 // are specified we pick the largest of the values.
81 absl::optional<TimeDelta> burst = bursty_pacer_flags_.burst.GetOptional();
82 if (slacked_pacer_flags_.allow_low_precision &&
83 slacked_pacer_flags_.send_burst_interval) {
84 TimeDelta slacked_burst = slacked_pacer_flags_.send_burst_interval.Value();
85 if (!burst.has_value() || burst.value() < slacked_burst) {
86 burst = slacked_burst;
87 }
88 }
89 // If not overriden by an experiment, the burst is specified by the
90 // `burst_interval` argument.
91 if (!burst.has_value()) {
92 burst = burst_interval;
93 }
94 if (burst.has_value()) {
95 pacing_controller_.SetSendBurstInterval(burst.value());
96 }
97 }
98
~TaskQueuePacedSender()99 TaskQueuePacedSender::~TaskQueuePacedSender() {
100 // Post an immediate task to mark the queue as shutting down.
101 // The rtc::TaskQueue destructor will wait for pending tasks to
102 // complete before continuing.
103 task_queue_.RunOrPost([&]() {
104 RTC_DCHECK_RUN_ON(&task_queue_);
105 is_shutdown_ = true;
106 });
107 }
108
EnsureStarted()109 void TaskQueuePacedSender::EnsureStarted() {
110 task_queue_.RunOrPost([this]() {
111 RTC_DCHECK_RUN_ON(&task_queue_);
112 is_started_ = true;
113 MaybeProcessPackets(Timestamp::MinusInfinity());
114 });
115 }
116
CreateProbeClusters(std::vector<ProbeClusterConfig> probe_cluster_configs)117 void TaskQueuePacedSender::CreateProbeClusters(
118 std::vector<ProbeClusterConfig> probe_cluster_configs) {
119 task_queue_.RunOrPost(
120 [this, probe_cluster_configs = std::move(probe_cluster_configs)]() {
121 RTC_DCHECK_RUN_ON(&task_queue_);
122 pacing_controller_.CreateProbeClusters(probe_cluster_configs);
123 MaybeProcessPackets(Timestamp::MinusInfinity());
124 });
125 }
126
Pause()127 void TaskQueuePacedSender::Pause() {
128 task_queue_.RunOrPost([this]() {
129 RTC_DCHECK_RUN_ON(&task_queue_);
130 pacing_controller_.Pause();
131 });
132 }
133
Resume()134 void TaskQueuePacedSender::Resume() {
135 task_queue_.RunOrPost([this]() {
136 RTC_DCHECK_RUN_ON(&task_queue_);
137 pacing_controller_.Resume();
138 MaybeProcessPackets(Timestamp::MinusInfinity());
139 });
140 }
141
SetCongested(bool congested)142 void TaskQueuePacedSender::SetCongested(bool congested) {
143 task_queue_.RunOrPost([this, congested]() {
144 RTC_DCHECK_RUN_ON(&task_queue_);
145 pacing_controller_.SetCongested(congested);
146 MaybeProcessPackets(Timestamp::MinusInfinity());
147 });
148 }
149
SetPacingRates(DataRate pacing_rate,DataRate padding_rate)150 void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
151 DataRate padding_rate) {
152 task_queue_.RunOrPost([this, pacing_rate, padding_rate]() {
153 RTC_DCHECK_RUN_ON(&task_queue_);
154 pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
155 MaybeProcessPackets(Timestamp::MinusInfinity());
156 });
157 }
158
EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets)159 void TaskQueuePacedSender::EnqueuePackets(
160 std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
161 task_queue_.TaskQueueForPost()->PostTask(task_queue_.MaybeSafeTask(
162 safety_.flag(), [this, packets = std::move(packets)]() mutable {
163 RTC_DCHECK_RUN_ON(&task_queue_);
164 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
165 "TaskQueuePacedSender::EnqueuePackets");
166 for (auto& packet : packets) {
167 TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
168 "TaskQueuePacedSender::EnqueuePackets::Loop",
169 "sequence_number", packet->SequenceNumber(),
170 "rtp_timestamp", packet->Timestamp());
171
172 size_t packet_size = packet->payload_size() + packet->padding_size();
173 if (include_overhead_) {
174 packet_size += packet->headers_size();
175 }
176 packet_size_.Apply(1, packet_size);
177 RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
178 pacing_controller_.EnqueuePacket(std::move(packet));
179 }
180 MaybeProcessPackets(Timestamp::MinusInfinity());
181 }));
182 }
183
SetAccountForAudioPackets(bool account_for_audio)184 void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
185 task_queue_.RunOrPost([this, account_for_audio]() {
186 RTC_DCHECK_RUN_ON(&task_queue_);
187 pacing_controller_.SetAccountForAudioPackets(account_for_audio);
188 MaybeProcessPackets(Timestamp::MinusInfinity());
189 });
190 }
191
SetIncludeOverhead()192 void TaskQueuePacedSender::SetIncludeOverhead() {
193 task_queue_.RunOrPost([this]() {
194 RTC_DCHECK_RUN_ON(&task_queue_);
195 include_overhead_ = true;
196 pacing_controller_.SetIncludeOverhead();
197 MaybeProcessPackets(Timestamp::MinusInfinity());
198 });
199 }
200
SetTransportOverhead(DataSize overhead_per_packet)201 void TaskQueuePacedSender::SetTransportOverhead(DataSize overhead_per_packet) {
202 task_queue_.RunOrPost([this, overhead_per_packet]() {
203 RTC_DCHECK_RUN_ON(&task_queue_);
204 pacing_controller_.SetTransportOverhead(overhead_per_packet);
205 MaybeProcessPackets(Timestamp::MinusInfinity());
206 });
207 }
208
SetQueueTimeLimit(TimeDelta limit)209 void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) {
210 task_queue_.RunOrPost([this, limit]() {
211 RTC_DCHECK_RUN_ON(&task_queue_);
212 pacing_controller_.SetQueueTimeLimit(limit);
213 MaybeProcessPackets(Timestamp::MinusInfinity());
214 });
215 }
216
ExpectedQueueTime() const217 TimeDelta TaskQueuePacedSender::ExpectedQueueTime() const {
218 return GetStats().expected_queue_time;
219 }
220
QueueSizeData() const221 DataSize TaskQueuePacedSender::QueueSizeData() const {
222 return GetStats().queue_size;
223 }
224
FirstSentPacketTime() const225 absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
226 return GetStats().first_sent_packet_time;
227 }
228
OldestPacketWaitTime() const229 TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
230 Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time;
231 if (oldest_packet.IsInfinite()) {
232 return TimeDelta::Zero();
233 }
234
235 // (webrtc:9716): The clock is not always monotonic.
236 Timestamp current = clock_->CurrentTime();
237 if (current < oldest_packet) {
238 return TimeDelta::Zero();
239 }
240
241 return current - oldest_packet;
242 }
243
OnStatsUpdated(const Stats & stats)244 void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) {
245 MutexLock lock(&stats_mutex_);
246 current_stats_ = stats;
247 }
248
MaybeProcessPackets(Timestamp scheduled_process_time)249 void TaskQueuePacedSender::MaybeProcessPackets(
250 Timestamp scheduled_process_time) {
251 RTC_DCHECK_RUN_ON(&task_queue_);
252
253 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
254 "TaskQueuePacedSender::MaybeProcessPackets");
255
256 if (is_shutdown_ || !is_started_) {
257 return;
258 }
259
260 Timestamp next_send_time = pacing_controller_.NextSendTime();
261 RTC_DCHECK(next_send_time.IsFinite());
262 const Timestamp now = clock_->CurrentTime();
263 TimeDelta early_execute_margin =
264 pacing_controller_.IsProbing()
265 ? PacingController::kMaxEarlyProbeProcessing
266 : TimeDelta::Zero();
267
268 // Process packets and update stats.
269 while (next_send_time <= now + early_execute_margin) {
270 pacing_controller_.ProcessPackets();
271 next_send_time = pacing_controller_.NextSendTime();
272 RTC_DCHECK(next_send_time.IsFinite());
273
274 // Probing state could change. Get margin after process packets.
275 early_execute_margin = pacing_controller_.IsProbing()
276 ? PacingController::kMaxEarlyProbeProcessing
277 : TimeDelta::Zero();
278 }
279 UpdateStats();
280
281 // Ignore retired scheduled task, otherwise reset `next_process_time_`.
282 if (scheduled_process_time.IsFinite()) {
283 if (scheduled_process_time != next_process_time_) {
284 return;
285 }
286 next_process_time_ = Timestamp::MinusInfinity();
287 }
288
289 // Do not hold back in probing.
290 TimeDelta hold_back_window = TimeDelta::Zero();
291 if (!pacing_controller_.IsProbing()) {
292 hold_back_window = max_hold_back_window_;
293 DataRate pacing_rate = pacing_controller_.pacing_rate();
294 if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
295 !pacing_rate.IsZero() &&
296 packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
297 TimeDelta avg_packet_send_time =
298 DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
299 hold_back_window =
300 std::min(hold_back_window,
301 avg_packet_send_time * max_hold_back_window_in_packets_);
302 }
303 }
304
305 // Calculate next process time.
306 TimeDelta time_to_next_process =
307 std::max(hold_back_window, next_send_time - now - early_execute_margin);
308 next_send_time = now + time_to_next_process;
309
310 // If no in flight task or in flight task is later than `next_send_time`,
311 // schedule a new one. Previous in flight task will be retired.
312 if (next_process_time_.IsMinusInfinity() ||
313 next_process_time_ > next_send_time) {
314 // Prefer low precision if allowed and not probing.
315 TaskQueueBase::DelayPrecision precision =
316 slacked_pacer_flags_.allow_low_precision &&
317 !pacing_controller_.IsProbing()
318 ? TaskQueueBase::DelayPrecision::kLow
319 : TaskQueueBase::DelayPrecision::kHigh;
320 // Check for cases where we need high precision.
321 if (precision == TaskQueueBase::DelayPrecision::kLow) {
322 auto& packets_per_type =
323 pacing_controller_.SizeInPacketsPerRtpPacketMediaType();
324 bool audio_or_retransmission_packets_in_queue =
325 packets_per_type[static_cast<size_t>(RtpPacketMediaType::kAudio)] >
326 0 ||
327 packets_per_type[static_cast<size_t>(
328 RtpPacketMediaType::kRetransmission)] > 0;
329 bool queue_time_too_large =
330 slacked_pacer_flags_.max_low_precision_expected_queue_time &&
331 pacing_controller_.ExpectedQueueTime() >=
332 slacked_pacer_flags_.max_low_precision_expected_queue_time
333 .Value();
334 if (audio_or_retransmission_packets_in_queue || queue_time_too_large) {
335 precision = TaskQueueBase::DelayPrecision::kHigh;
336 }
337 }
338
339 task_queue_.TaskQueueForDelayedTasks()->PostDelayedTaskWithPrecision(
340 precision,
341 task_queue_.MaybeSafeTask(
342 safety_.flag(),
343 [this, next_send_time]() { MaybeProcessPackets(next_send_time); }),
344 time_to_next_process.RoundUpTo(TimeDelta::Millis(1)));
345 next_process_time_ = next_send_time;
346 }
347 }
348
UpdateStats()349 void TaskQueuePacedSender::UpdateStats() {
350 Stats new_stats;
351 new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
352 new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime();
353 new_stats.oldest_packet_enqueue_time =
354 pacing_controller_.OldestPacketEnqueueTime();
355 new_stats.queue_size = pacing_controller_.QueueSizeData();
356 OnStatsUpdated(new_stats);
357 }
358
GetStats() const359 TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const {
360 MutexLock lock(&stats_mutex_);
361 return current_stats_;
362 }
363
364 } // namespace webrtc
365