xref: /aosp_15_r20/external/webrtc/modules/pacing/task_queue_paced_sender.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/pacing/task_queue_paced_sender.h"
12 
13 #include <algorithm>
14 #include <utility>
15 
16 #include "absl/memory/memory.h"
17 #include "api/task_queue/pending_task_safety_flag.h"
18 #include "api/transport/network_types.h"
19 #include "rtc_base/checks.h"
20 #include "rtc_base/experiments/field_trial_parser.h"
21 #include "rtc_base/experiments/field_trial_units.h"
22 #include "rtc_base/system/unused.h"
23 #include "rtc_base/trace_event.h"
24 
25 namespace webrtc {
26 
27 namespace {
28 
29 constexpr const char* kBurstyPacerFieldTrial = "WebRTC-BurstyPacer";
30 
31 constexpr const char* kSlackedTaskQueuePacedSenderFieldTrial =
32     "WebRTC-SlackedTaskQueuePacedSender";
33 
34 }  // namespace
35 
36 const int TaskQueuePacedSender::kNoPacketHoldback = -1;
37 
BurstyPacerFlags(const FieldTrialsView & field_trials)38 TaskQueuePacedSender::BurstyPacerFlags::BurstyPacerFlags(
39     const FieldTrialsView& field_trials)
40     : burst("burst") {
41   ParseFieldTrial({&burst}, field_trials.Lookup(kBurstyPacerFieldTrial));
42 }
43 
SlackedPacerFlags(const FieldTrialsView & field_trials)44 TaskQueuePacedSender::SlackedPacerFlags::SlackedPacerFlags(
45     const FieldTrialsView& field_trials)
46     : allow_low_precision("Enabled"),
47       max_low_precision_expected_queue_time("max_queue_time"),
48       send_burst_interval("send_burst_interval") {
49   ParseFieldTrial({&allow_low_precision, &max_low_precision_expected_queue_time,
50                    &send_burst_interval},
51                   field_trials.Lookup(kSlackedTaskQueuePacedSenderFieldTrial));
52 }
53 
TaskQueuePacedSender(Clock * clock,PacingController::PacketSender * packet_sender,const FieldTrialsView & field_trials,TaskQueueFactory * task_queue_factory,TimeDelta max_hold_back_window,int max_hold_back_window_in_packets,absl::optional<TimeDelta> burst_interval)54 TaskQueuePacedSender::TaskQueuePacedSender(
55     Clock* clock,
56     PacingController::PacketSender* packet_sender,
57     const FieldTrialsView& field_trials,
58     TaskQueueFactory* task_queue_factory,
59     TimeDelta max_hold_back_window,
60     int max_hold_back_window_in_packets,
61     absl::optional<TimeDelta> burst_interval)
62     : clock_(clock),
63       bursty_pacer_flags_(field_trials),
64       slacked_pacer_flags_(field_trials),
65       max_hold_back_window_(slacked_pacer_flags_.allow_low_precision
66                                 ? PacingController::kMinSleepTime
67                                 : max_hold_back_window),
68       max_hold_back_window_in_packets_(slacked_pacer_flags_.allow_low_precision
69                                            ? 0
70                                            : max_hold_back_window_in_packets),
71       pacing_controller_(clock, packet_sender, field_trials),
72       next_process_time_(Timestamp::MinusInfinity()),
73       is_started_(false),
74       is_shutdown_(false),
75       packet_size_(/*alpha=*/0.95),
76       include_overhead_(false),
77       task_queue_(field_trials, "TaskQueuePacedSender", task_queue_factory) {
78   RTC_DCHECK_GE(max_hold_back_window_, PacingController::kMinSleepTime);
79   // There are multiple field trials that can affect burst. If multiple bursts
80   // are specified we pick the largest of the values.
81   absl::optional<TimeDelta> burst = bursty_pacer_flags_.burst.GetOptional();
82   if (slacked_pacer_flags_.allow_low_precision &&
83       slacked_pacer_flags_.send_burst_interval) {
84     TimeDelta slacked_burst = slacked_pacer_flags_.send_burst_interval.Value();
85     if (!burst.has_value() || burst.value() < slacked_burst) {
86       burst = slacked_burst;
87     }
88   }
89   // If not overriden by an experiment, the burst is specified by the
90   // `burst_interval` argument.
91   if (!burst.has_value()) {
92     burst = burst_interval;
93   }
94   if (burst.has_value()) {
95     pacing_controller_.SetSendBurstInterval(burst.value());
96   }
97 }
98 
~TaskQueuePacedSender()99 TaskQueuePacedSender::~TaskQueuePacedSender() {
100   // Post an immediate task to mark the queue as shutting down.
101   // The rtc::TaskQueue destructor will wait for pending tasks to
102   // complete before continuing.
103   task_queue_.RunOrPost([&]() {
104     RTC_DCHECK_RUN_ON(&task_queue_);
105     is_shutdown_ = true;
106   });
107 }
108 
EnsureStarted()109 void TaskQueuePacedSender::EnsureStarted() {
110   task_queue_.RunOrPost([this]() {
111     RTC_DCHECK_RUN_ON(&task_queue_);
112     is_started_ = true;
113     MaybeProcessPackets(Timestamp::MinusInfinity());
114   });
115 }
116 
CreateProbeClusters(std::vector<ProbeClusterConfig> probe_cluster_configs)117 void TaskQueuePacedSender::CreateProbeClusters(
118     std::vector<ProbeClusterConfig> probe_cluster_configs) {
119   task_queue_.RunOrPost(
120       [this, probe_cluster_configs = std::move(probe_cluster_configs)]() {
121         RTC_DCHECK_RUN_ON(&task_queue_);
122         pacing_controller_.CreateProbeClusters(probe_cluster_configs);
123         MaybeProcessPackets(Timestamp::MinusInfinity());
124       });
125 }
126 
Pause()127 void TaskQueuePacedSender::Pause() {
128   task_queue_.RunOrPost([this]() {
129     RTC_DCHECK_RUN_ON(&task_queue_);
130     pacing_controller_.Pause();
131   });
132 }
133 
Resume()134 void TaskQueuePacedSender::Resume() {
135   task_queue_.RunOrPost([this]() {
136     RTC_DCHECK_RUN_ON(&task_queue_);
137     pacing_controller_.Resume();
138     MaybeProcessPackets(Timestamp::MinusInfinity());
139   });
140 }
141 
SetCongested(bool congested)142 void TaskQueuePacedSender::SetCongested(bool congested) {
143   task_queue_.RunOrPost([this, congested]() {
144     RTC_DCHECK_RUN_ON(&task_queue_);
145     pacing_controller_.SetCongested(congested);
146     MaybeProcessPackets(Timestamp::MinusInfinity());
147   });
148 }
149 
SetPacingRates(DataRate pacing_rate,DataRate padding_rate)150 void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
151                                           DataRate padding_rate) {
152   task_queue_.RunOrPost([this, pacing_rate, padding_rate]() {
153     RTC_DCHECK_RUN_ON(&task_queue_);
154     pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
155     MaybeProcessPackets(Timestamp::MinusInfinity());
156   });
157 }
158 
EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets)159 void TaskQueuePacedSender::EnqueuePackets(
160     std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
161   task_queue_.TaskQueueForPost()->PostTask(task_queue_.MaybeSafeTask(
162       safety_.flag(), [this, packets = std::move(packets)]() mutable {
163         RTC_DCHECK_RUN_ON(&task_queue_);
164         TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
165                      "TaskQueuePacedSender::EnqueuePackets");
166         for (auto& packet : packets) {
167           TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
168                        "TaskQueuePacedSender::EnqueuePackets::Loop",
169                        "sequence_number", packet->SequenceNumber(),
170                        "rtp_timestamp", packet->Timestamp());
171 
172           size_t packet_size = packet->payload_size() + packet->padding_size();
173           if (include_overhead_) {
174             packet_size += packet->headers_size();
175           }
176           packet_size_.Apply(1, packet_size);
177           RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
178           pacing_controller_.EnqueuePacket(std::move(packet));
179         }
180         MaybeProcessPackets(Timestamp::MinusInfinity());
181       }));
182 }
183 
SetAccountForAudioPackets(bool account_for_audio)184 void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
185   task_queue_.RunOrPost([this, account_for_audio]() {
186     RTC_DCHECK_RUN_ON(&task_queue_);
187     pacing_controller_.SetAccountForAudioPackets(account_for_audio);
188     MaybeProcessPackets(Timestamp::MinusInfinity());
189   });
190 }
191 
SetIncludeOverhead()192 void TaskQueuePacedSender::SetIncludeOverhead() {
193   task_queue_.RunOrPost([this]() {
194     RTC_DCHECK_RUN_ON(&task_queue_);
195     include_overhead_ = true;
196     pacing_controller_.SetIncludeOverhead();
197     MaybeProcessPackets(Timestamp::MinusInfinity());
198   });
199 }
200 
SetTransportOverhead(DataSize overhead_per_packet)201 void TaskQueuePacedSender::SetTransportOverhead(DataSize overhead_per_packet) {
202   task_queue_.RunOrPost([this, overhead_per_packet]() {
203     RTC_DCHECK_RUN_ON(&task_queue_);
204     pacing_controller_.SetTransportOverhead(overhead_per_packet);
205     MaybeProcessPackets(Timestamp::MinusInfinity());
206   });
207 }
208 
SetQueueTimeLimit(TimeDelta limit)209 void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) {
210   task_queue_.RunOrPost([this, limit]() {
211     RTC_DCHECK_RUN_ON(&task_queue_);
212     pacing_controller_.SetQueueTimeLimit(limit);
213     MaybeProcessPackets(Timestamp::MinusInfinity());
214   });
215 }
216 
ExpectedQueueTime() const217 TimeDelta TaskQueuePacedSender::ExpectedQueueTime() const {
218   return GetStats().expected_queue_time;
219 }
220 
QueueSizeData() const221 DataSize TaskQueuePacedSender::QueueSizeData() const {
222   return GetStats().queue_size;
223 }
224 
FirstSentPacketTime() const225 absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
226   return GetStats().first_sent_packet_time;
227 }
228 
OldestPacketWaitTime() const229 TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
230   Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time;
231   if (oldest_packet.IsInfinite()) {
232     return TimeDelta::Zero();
233   }
234 
235   // (webrtc:9716): The clock is not always monotonic.
236   Timestamp current = clock_->CurrentTime();
237   if (current < oldest_packet) {
238     return TimeDelta::Zero();
239   }
240 
241   return current - oldest_packet;
242 }
243 
OnStatsUpdated(const Stats & stats)244 void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) {
245   MutexLock lock(&stats_mutex_);
246   current_stats_ = stats;
247 }
248 
MaybeProcessPackets(Timestamp scheduled_process_time)249 void TaskQueuePacedSender::MaybeProcessPackets(
250     Timestamp scheduled_process_time) {
251   RTC_DCHECK_RUN_ON(&task_queue_);
252 
253   TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
254                "TaskQueuePacedSender::MaybeProcessPackets");
255 
256   if (is_shutdown_ || !is_started_) {
257     return;
258   }
259 
260   Timestamp next_send_time = pacing_controller_.NextSendTime();
261   RTC_DCHECK(next_send_time.IsFinite());
262   const Timestamp now = clock_->CurrentTime();
263   TimeDelta early_execute_margin =
264       pacing_controller_.IsProbing()
265           ? PacingController::kMaxEarlyProbeProcessing
266           : TimeDelta::Zero();
267 
268   // Process packets and update stats.
269   while (next_send_time <= now + early_execute_margin) {
270     pacing_controller_.ProcessPackets();
271     next_send_time = pacing_controller_.NextSendTime();
272     RTC_DCHECK(next_send_time.IsFinite());
273 
274     // Probing state could change. Get margin after process packets.
275     early_execute_margin = pacing_controller_.IsProbing()
276                                ? PacingController::kMaxEarlyProbeProcessing
277                                : TimeDelta::Zero();
278   }
279   UpdateStats();
280 
281   // Ignore retired scheduled task, otherwise reset `next_process_time_`.
282   if (scheduled_process_time.IsFinite()) {
283     if (scheduled_process_time != next_process_time_) {
284       return;
285     }
286     next_process_time_ = Timestamp::MinusInfinity();
287   }
288 
289   // Do not hold back in probing.
290   TimeDelta hold_back_window = TimeDelta::Zero();
291   if (!pacing_controller_.IsProbing()) {
292     hold_back_window = max_hold_back_window_;
293     DataRate pacing_rate = pacing_controller_.pacing_rate();
294     if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
295         !pacing_rate.IsZero() &&
296         packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
297       TimeDelta avg_packet_send_time =
298           DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
299       hold_back_window =
300           std::min(hold_back_window,
301                    avg_packet_send_time * max_hold_back_window_in_packets_);
302     }
303   }
304 
305   // Calculate next process time.
306   TimeDelta time_to_next_process =
307       std::max(hold_back_window, next_send_time - now - early_execute_margin);
308   next_send_time = now + time_to_next_process;
309 
310   // If no in flight task or in flight task is later than `next_send_time`,
311   // schedule a new one. Previous in flight task will be retired.
312   if (next_process_time_.IsMinusInfinity() ||
313       next_process_time_ > next_send_time) {
314     // Prefer low precision if allowed and not probing.
315     TaskQueueBase::DelayPrecision precision =
316         slacked_pacer_flags_.allow_low_precision &&
317                 !pacing_controller_.IsProbing()
318             ? TaskQueueBase::DelayPrecision::kLow
319             : TaskQueueBase::DelayPrecision::kHigh;
320     // Check for cases where we need high precision.
321     if (precision == TaskQueueBase::DelayPrecision::kLow) {
322       auto& packets_per_type =
323           pacing_controller_.SizeInPacketsPerRtpPacketMediaType();
324       bool audio_or_retransmission_packets_in_queue =
325           packets_per_type[static_cast<size_t>(RtpPacketMediaType::kAudio)] >
326               0 ||
327           packets_per_type[static_cast<size_t>(
328               RtpPacketMediaType::kRetransmission)] > 0;
329       bool queue_time_too_large =
330           slacked_pacer_flags_.max_low_precision_expected_queue_time &&
331           pacing_controller_.ExpectedQueueTime() >=
332               slacked_pacer_flags_.max_low_precision_expected_queue_time
333                   .Value();
334       if (audio_or_retransmission_packets_in_queue || queue_time_too_large) {
335         precision = TaskQueueBase::DelayPrecision::kHigh;
336       }
337     }
338 
339     task_queue_.TaskQueueForDelayedTasks()->PostDelayedTaskWithPrecision(
340         precision,
341         task_queue_.MaybeSafeTask(
342             safety_.flag(),
343             [this, next_send_time]() { MaybeProcessPackets(next_send_time); }),
344         time_to_next_process.RoundUpTo(TimeDelta::Millis(1)));
345     next_process_time_ = next_send_time;
346   }
347 }
348 
UpdateStats()349 void TaskQueuePacedSender::UpdateStats() {
350   Stats new_stats;
351   new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
352   new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime();
353   new_stats.oldest_packet_enqueue_time =
354       pacing_controller_.OldestPacketEnqueueTime();
355   new_stats.queue_size = pacing_controller_.QueueSizeData();
356   OnStatsUpdated(new_stats);
357 }
358 
GetStats() const359 TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const {
360   MutexLock lock(&stats_mutex_);
361   return current_stats_;
362 }
363 
364 }  // namespace webrtc
365