1 /* 2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #ifndef MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_ 12 #define MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_ 13 14 #include <stddef.h> 15 #include <stdint.h> 16 17 #include <memory> 18 #include <vector> 19 20 #include "absl/types/optional.h" 21 #include "api/field_trials_view.h" 22 #include "api/sequence_checker.h" 23 #include "api/task_queue/task_queue_factory.h" 24 #include "api/units/data_size.h" 25 #include "api/units/time_delta.h" 26 #include "api/units/timestamp.h" 27 #include "modules/pacing/pacing_controller.h" 28 #include "modules/pacing/rtp_packet_pacer.h" 29 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" 30 #include "modules/utility/maybe_worker_thread.h" 31 #include "rtc_base/experiments/field_trial_parser.h" 32 #include "rtc_base/numerics/exp_filter.h" 33 #include "rtc_base/thread_annotations.h" 34 35 namespace webrtc { 36 class Clock; 37 38 class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender { 39 public: 40 static const int kNoPacketHoldback; 41 42 // The pacer can be configured using `field_trials` or specified parameters. 43 // 44 // The `hold_back_window` parameter sets a lower bound on time to sleep if 45 // there is currently a pacer queue and packets can't immediately be 46 // processed. Increasing this reduces thread wakeups at the expense of higher 47 // latency. 48 // 49 // If the `burst_interval` parameter is set, the pacer is allowed to build up 50 // a packet "debt" that correspond to approximately the send rate during the 51 // specified interval. This greatly reduced wake ups by not pacing packets 52 // within the allowed burst budget. 53 TaskQueuePacedSender( 54 Clock* clock, 55 PacingController::PacketSender* packet_sender, 56 const FieldTrialsView& field_trials, 57 TaskQueueFactory* task_queue_factory, 58 TimeDelta max_hold_back_window, 59 int max_hold_back_window_in_packets, 60 absl::optional<TimeDelta> burst_interval = absl::nullopt); 61 62 ~TaskQueuePacedSender() override; 63 64 // Ensure that necessary delayed tasks are scheduled. 65 void EnsureStarted(); 66 67 // Methods implementing RtpPacketSender. 68 69 // Adds the packet to the queue and calls 70 // PacingController::PacketSender::SendPacket() when it's time to send. 71 void EnqueuePackets( 72 std::vector<std::unique_ptr<RtpPacketToSend>> packets) override; 73 74 // Methods implementing RtpPacketPacer. 75 76 void CreateProbeClusters( 77 std::vector<ProbeClusterConfig> probe_cluster_configs) override; 78 79 // Temporarily pause all sending. 80 void Pause() override; 81 82 // Resume sending packets. 83 void Resume() override; 84 85 void SetCongested(bool congested) override; 86 87 // Sets the pacing rates. Must be called once before packets can be sent. 88 void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override; 89 90 // Currently audio traffic is not accounted for by pacer and passed through. 91 // With the introduction of audio BWE, audio traffic will be accounted for 92 // in the pacer budget calculation. The audio traffic will still be injected 93 // at high priority. 94 void SetAccountForAudioPackets(bool account_for_audio) override; 95 96 void SetIncludeOverhead() override; 97 void SetTransportOverhead(DataSize overhead_per_packet) override; 98 99 // Returns the time since the oldest queued packet was enqueued. 100 TimeDelta OldestPacketWaitTime() const override; 101 102 // Returns total size of all packets in the pacer queue. 103 DataSize QueueSizeData() const override; 104 105 // Returns the time when the first packet was sent; 106 absl::optional<Timestamp> FirstSentPacketTime() const override; 107 108 // Returns the number of milliseconds it will take to send the current 109 // packets in the queue, given the current size and bitrate, ignoring prio. 110 TimeDelta ExpectedQueueTime() const override; 111 112 // Set the max desired queuing delay, pacer will override the pacing rate 113 // specified by SetPacingRates() if needed to achieve this goal. 114 void SetQueueTimeLimit(TimeDelta limit) override; 115 116 protected: 117 // Exposed as protected for test. 118 struct Stats { StatsStats119 Stats() 120 : oldest_packet_enqueue_time(Timestamp::MinusInfinity()), 121 queue_size(DataSize::Zero()), 122 expected_queue_time(TimeDelta::Zero()) {} 123 Timestamp oldest_packet_enqueue_time; 124 DataSize queue_size; 125 TimeDelta expected_queue_time; 126 absl::optional<Timestamp> first_sent_packet_time; 127 }; 128 void OnStatsUpdated(const Stats& stats); 129 130 private: 131 // Check if it is time to send packets, or schedule a delayed task if not. 132 // Use Timestamp::MinusInfinity() to indicate that this call has _not_ 133 // been scheduled by the pacing controller. If this is the case, check if 134 // can execute immediately otherwise schedule a delay task that calls this 135 // method again with desired (finite) scheduled process time. 136 void MaybeProcessPackets(Timestamp scheduled_process_time); 137 138 void UpdateStats() RTC_RUN_ON(task_queue_); 139 Stats GetStats() const; 140 141 Clock* const clock_; 142 struct BurstyPacerFlags { 143 // Parses `kBurstyPacerFieldTrial`. Example: 144 // --force-fieldtrials=WebRTC-BurstyPacer/burst:20ms/ 145 explicit BurstyPacerFlags(const FieldTrialsView& field_trials); 146 // If set, the pacer is allowed to build up a packet "debt" that correspond 147 // to approximately the send rate during the specified interval. 148 FieldTrialOptional<TimeDelta> burst; 149 }; 150 const BurstyPacerFlags bursty_pacer_flags_; 151 struct SlackedPacerFlags { 152 // Parses `kSlackedTaskQueuePacedSenderFieldTrial`. Example: 153 // --force-fieldtrials=WebRTC-SlackedTaskQueuePacedSender/Enabled,max_queue_time:75ms/ 154 explicit SlackedPacerFlags(const FieldTrialsView& field_trials); 155 // When "Enabled", delayed tasks invoking MaybeProcessPackets() are 156 // scheduled using low precision instead of high precision, resulting in 157 // less idle wake ups and packets being sent in bursts if the `task_queue_` 158 // implementation supports slack. When probing, high precision is used 159 // regardless to ensure good bandwidth estimation. 160 FieldTrialFlag allow_low_precision; 161 // Controlled via the "max_queue_time" experiment argument. If set, uses 162 // high precision scheduling of MaybeProcessPackets() whenever the expected 163 // queue time is greater than or equal to this value. 164 FieldTrialOptional<TimeDelta> max_low_precision_expected_queue_time; 165 // Controlled via "send_burst_interval" experiment argument. If set, the 166 // pacer is allowed to build up a packet "debt" that correspond to 167 // approximately the send rate during the specified interval. 168 FieldTrialOptional<TimeDelta> send_burst_interval; 169 }; 170 const SlackedPacerFlags slacked_pacer_flags_; 171 // The holdback window prevents too frequent delayed MaybeProcessPackets() 172 // calls. These are only applicable if `allow_low_precision` is false. 173 const TimeDelta max_hold_back_window_; 174 const int max_hold_back_window_in_packets_; 175 176 PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_); 177 178 // We want only one (valid) delayed process task in flight at a time. 179 // If the value of `next_process_time_` is finite, it is an id for a 180 // delayed task that will call MaybeProcessPackets() with that time 181 // as parameter. 182 // Timestamp::MinusInfinity() indicates no valid pending task. 183 Timestamp next_process_time_ RTC_GUARDED_BY(task_queue_); 184 185 // Indicates if this task queue is started. If not, don't allow 186 // posting delayed tasks yet. 187 bool is_started_ RTC_GUARDED_BY(task_queue_); 188 189 // Indicates if this task queue is shutting down. If so, don't allow 190 // posting any more delayed tasks as that can cause the task queue to 191 // never drain. 192 bool is_shutdown_ RTC_GUARDED_BY(task_queue_); 193 194 // Filtered size of enqueued packets, in bytes. 195 rtc::ExpFilter packet_size_ RTC_GUARDED_BY(task_queue_); 196 bool include_overhead_ RTC_GUARDED_BY(task_queue_); 197 198 // TODO(webrtc:14502): Remove stats_mutex_ when pacer runs on the worker 199 // thread. 200 mutable Mutex stats_mutex_; 201 Stats current_stats_ RTC_GUARDED_BY(stats_mutex_); 202 203 ScopedTaskSafety safety_; 204 MaybeWorkerThread task_queue_; 205 }; 206 } // namespace webrtc 207 #endif // MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_ 208