xref: /aosp_15_r20/external/webrtc/modules/pacing/task_queue_paced_sender_unittest.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/pacing/task_queue_paced_sender.h"
12 
13 #include <algorithm>
14 #include <atomic>
15 #include <list>
16 #include <memory>
17 #include <string>
18 #include <utility>
19 #include <vector>
20 
21 #include "absl/functional/any_invocable.h"
22 #include "api/task_queue/task_queue_base.h"
23 #include "api/transport/network_types.h"
24 #include "api/units/data_rate.h"
25 #include "modules/pacing/packet_router.h"
26 #include "test/gmock.h"
27 #include "test/gtest.h"
28 #include "test/scoped_key_value_config.h"
29 #include "test/time_controller/simulated_time_controller.h"
30 
31 using ::testing::_;
32 using ::testing::AtLeast;
33 using ::testing::Return;
34 using ::testing::SaveArg;
35 
36 namespace webrtc {
37 namespace {
38 constexpr uint32_t kAudioSsrc = 12345;
39 constexpr uint32_t kVideoSsrc = 234565;
40 constexpr uint32_t kVideoRtxSsrc = 34567;
41 constexpr uint32_t kFlexFecSsrc = 45678;
42 constexpr size_t kDefaultPacketSize = 1234;
43 
44 class MockPacketRouter : public PacketRouter {
45  public:
46   MOCK_METHOD(void,
47               SendPacket,
48               (std::unique_ptr<RtpPacketToSend> packet,
49                const PacedPacketInfo& cluster_info),
50               (override));
51   MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>,
52               FetchFec,
53               (),
54               (override));
55   MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>,
56               GeneratePadding,
57               (DataSize target_size),
58               (override));
59 };
60 
GeneratePadding(DataSize target_size)61 std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
62     DataSize target_size) {
63   // 224 bytes is the max padding size for plain padding packets generated by
64   // RTPSender::GeneratePadding().
65   const DataSize kMaxPaddingPacketSize = DataSize::Bytes(224);
66   DataSize padding_generated = DataSize::Zero();
67   std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets;
68   while (padding_generated < target_size) {
69     DataSize packet_size =
70         std::min(target_size - padding_generated, kMaxPaddingPacketSize);
71     padding_generated += packet_size;
72     auto padding_packet =
73         std::make_unique<RtpPacketToSend>(/*extensions=*/nullptr);
74     padding_packet->set_packet_type(RtpPacketMediaType::kPadding);
75     padding_packet->SetPadding(packet_size.bytes());
76     padding_packets.push_back(std::move(padding_packet));
77   }
78   return padding_packets;
79 }
80 
81 class TaskQueueWithFakePrecisionFactory : public TaskQueueFactory {
82  public:
TaskQueueWithFakePrecisionFactory(TaskQueueFactory * task_queue_factory)83   explicit TaskQueueWithFakePrecisionFactory(
84       TaskQueueFactory* task_queue_factory)
85       : task_queue_factory_(task_queue_factory) {}
86 
CreateTaskQueue(absl::string_view name,Priority priority) const87   std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
88       absl::string_view name,
89       Priority priority) const override {
90     return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
91         new TaskQueueWithFakePrecision(
92             const_cast<TaskQueueWithFakePrecisionFactory*>(this),
93             task_queue_factory_));
94   }
95 
delayed_low_precision_count() const96   int delayed_low_precision_count() const {
97     return delayed_low_precision_count_;
98   }
delayed_high_precision_count() const99   int delayed_high_precision_count() const {
100     return delayed_high_precision_count_;
101   }
102 
103  private:
104   friend class TaskQueueWithFakePrecision;
105 
106   class TaskQueueWithFakePrecision : public TaskQueueBase {
107    public:
TaskQueueWithFakePrecision(TaskQueueWithFakePrecisionFactory * parent_factory,TaskQueueFactory * task_queue_factory)108     TaskQueueWithFakePrecision(
109         TaskQueueWithFakePrecisionFactory* parent_factory,
110         TaskQueueFactory* task_queue_factory)
111         : parent_factory_(parent_factory),
112           task_queue_(task_queue_factory->CreateTaskQueue(
113               "TaskQueueWithFakePrecision",
114               TaskQueueFactory::Priority::NORMAL)) {}
~TaskQueueWithFakePrecision()115     ~TaskQueueWithFakePrecision() override {}
116 
Delete()117     void Delete() override {
118       // `task_queue_->Delete()` is implicitly called in the destructor due to
119       // TaskQueueDeleter.
120       delete this;
121     }
PostTask(absl::AnyInvocable<void ()&&> task)122     void PostTask(absl::AnyInvocable<void() &&> task) override {
123       task_queue_->PostTask(WrapTask(std::move(task)));
124     }
PostDelayedTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)125     void PostDelayedTask(absl::AnyInvocable<void() &&> task,
126                          TimeDelta delay) override {
127       ++parent_factory_->delayed_low_precision_count_;
128       task_queue_->PostDelayedTask(WrapTask(std::move(task)), delay);
129     }
PostDelayedHighPrecisionTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)130     void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
131                                       TimeDelta delay) override {
132       ++parent_factory_->delayed_high_precision_count_;
133       task_queue_->PostDelayedHighPrecisionTask(WrapTask(std::move(task)),
134                                                 delay);
135     }
136 
137    private:
WrapTask(absl::AnyInvocable<void ()&&> task)138     absl::AnyInvocable<void() &&> WrapTask(absl::AnyInvocable<void() &&> task) {
139       return [this, task = std::move(task)]() mutable {
140         CurrentTaskQueueSetter set_current(this);
141         std::move(task)();
142       };
143     }
144 
145     TaskQueueWithFakePrecisionFactory* parent_factory_;
146     std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue_;
147   };
148 
149   TaskQueueFactory* task_queue_factory_;
150   std::atomic<int> delayed_low_precision_count_ = 0u;
151   std::atomic<int> delayed_high_precision_count_ = 0u;
152 };
153 
154 }  // namespace
155 
156 namespace test {
157 
BuildRtpPacket(RtpPacketMediaType type)158 std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketMediaType type) {
159   auto packet = std::make_unique<RtpPacketToSend>(nullptr);
160   packet->set_packet_type(type);
161   switch (type) {
162     case RtpPacketMediaType::kAudio:
163       packet->SetSsrc(kAudioSsrc);
164       break;
165     case RtpPacketMediaType::kVideo:
166       packet->SetSsrc(kVideoSsrc);
167       break;
168     case RtpPacketMediaType::kRetransmission:
169     case RtpPacketMediaType::kPadding:
170       packet->SetSsrc(kVideoRtxSsrc);
171       break;
172     case RtpPacketMediaType::kForwardErrorCorrection:
173       packet->SetSsrc(kFlexFecSsrc);
174       break;
175   }
176 
177   packet->SetPayloadSize(kDefaultPacketSize);
178   return packet;
179 }
180 
GeneratePackets(RtpPacketMediaType type,size_t num_packets)181 std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
182     RtpPacketMediaType type,
183     size_t num_packets) {
184   std::vector<std::unique_ptr<RtpPacketToSend>> packets;
185   for (size_t i = 0; i < num_packets; ++i) {
186     packets.push_back(BuildRtpPacket(type));
187   }
188   return packets;
189 }
190 
191 constexpr char kSendPacketOnWorkerThreadFieldTrial[] =
192     "WebRTC-SendPacketsOnWorkerThread/Enabled/";
193 
ParameterizedFieldTrials()194 std::vector<std::string> ParameterizedFieldTrials() {
195   return {{""}, {kSendPacketOnWorkerThreadFieldTrial}};
196 }
197 
UsingWorkerThread(absl::string_view field_trials)198 bool UsingWorkerThread(absl::string_view field_trials) {
199   return field_trials.find(kSendPacketOnWorkerThreadFieldTrial) !=
200          std::string::npos;
201 }
202 
203 class TaskQueuePacedSenderTest
204     : public ::testing::TestWithParam<std::string /*field_trials*/> {};
205 
206 INSTANTIATE_TEST_SUITE_P(TaskQueuePacedSenderTest,
207                          TaskQueuePacedSenderTest,
208                          testing::ValuesIn(ParameterizedFieldTrials()),
__anon6bcb728c0302(const testing::TestParamInfo<std::string>& info) 209                          [](const testing::TestParamInfo<std::string>& info) {
210                            return UsingWorkerThread(info.param) ? "UsingWt"
211                                                                 : "OwnedTQ";
212                          });
213 
TEST_P(TaskQueuePacedSenderTest,PacesPackets)214 TEST_P(TaskQueuePacedSenderTest, PacesPackets) {
215   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
216   MockPacketRouter packet_router;
217   ScopedKeyValueConfig trials(GetParam());
218   TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
219                              time_controller.GetTaskQueueFactory(),
220                              PacingController::kMinSleepTime,
221                              TaskQueuePacedSender::kNoPacketHoldback);
222 
223   // Insert a number of packets, covering one second.
224   static constexpr size_t kPacketsToSend = 42;
225   SequenceChecker sequence_checker;
226   pacer.SetPacingRates(
227       DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
228       DataRate::Zero());
229   pacer.EnsureStarted();
230   pacer.EnqueuePackets(
231       GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
232 
233   // Expect all of them to be sent.
234   size_t packets_sent = 0;
235   Timestamp end_time = Timestamp::PlusInfinity();
236   EXPECT_CALL(packet_router, SendPacket)
237       .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
238                           const PacedPacketInfo& cluster_info) {
239         ++packets_sent;
240         if (packets_sent == kPacketsToSend) {
241           end_time = time_controller.GetClock()->CurrentTime();
242         }
243         EXPECT_EQ(sequence_checker.IsCurrent(), UsingWorkerThread(GetParam()));
244       });
245 
246   const Timestamp start_time = time_controller.GetClock()->CurrentTime();
247 
248   // Packets should be sent over a period of close to 1s. Expect a little
249   // lower than this since initial probing is a bit quicker.
250   time_controller.AdvanceTime(TimeDelta::Seconds(1));
251   EXPECT_EQ(packets_sent, kPacketsToSend);
252   ASSERT_TRUE(end_time.IsFinite());
253   EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
254 }
255 
256 // Same test as above, but with 0.5s of burst applied.
TEST_P(TaskQueuePacedSenderTest,PacesPacketsWithBurst)257 TEST_P(TaskQueuePacedSenderTest, PacesPacketsWithBurst) {
258   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
259   MockPacketRouter packet_router;
260   ScopedKeyValueConfig trials(GetParam());
261   TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
262                              time_controller.GetTaskQueueFactory(),
263                              PacingController::kMinSleepTime,
264                              TaskQueuePacedSender::kNoPacketHoldback,
265                              // Half a second of bursting.
266                              TimeDelta::Seconds(0.5));
267 
268   // Insert a number of packets, covering one second.
269   static constexpr size_t kPacketsToSend = 42;
270   SequenceChecker sequence_checker;
271   pacer.SetPacingRates(
272       DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
273       DataRate::Zero());
274   pacer.EnsureStarted();
275   pacer.EnqueuePackets(
276       GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
277 
278   // Expect all of them to be sent.
279   size_t packets_sent = 0;
280   Timestamp end_time = Timestamp::PlusInfinity();
281   EXPECT_CALL(packet_router, SendPacket)
282       .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
283                           const PacedPacketInfo& cluster_info) {
284         ++packets_sent;
285         if (packets_sent == kPacketsToSend) {
286           end_time = time_controller.GetClock()->CurrentTime();
287         }
288         EXPECT_EQ(sequence_checker.IsCurrent(), UsingWorkerThread(GetParam()));
289       });
290 
291   const Timestamp start_time = time_controller.GetClock()->CurrentTime();
292 
293   // Packets should be sent over a period of close to 1s. Expect a little
294   // lower than this since initial probing is a bit quicker.
295   time_controller.AdvanceTime(TimeDelta::Seconds(1));
296   EXPECT_EQ(packets_sent, kPacketsToSend);
297   ASSERT_TRUE(end_time.IsFinite());
298   // Because of half a second of burst, what would normally have been paced over
299   // ~1 second now takes ~0.5 seconds.
300   EXPECT_NEAR((end_time - start_time).ms<double>(), 500.0, 50.0);
301 }
302 
TEST_P(TaskQueuePacedSenderTest,ReschedulesProcessOnRateChange)303 TEST_P(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
304   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
305   MockPacketRouter packet_router;
306   ScopedKeyValueConfig trials(GetParam());
307   TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
308                              time_controller.GetTaskQueueFactory(),
309                              PacingController::kMinSleepTime,
310                              TaskQueuePacedSender::kNoPacketHoldback);
311 
312   // Insert a number of packets to be sent 200ms apart.
313   const size_t kPacketsPerSecond = 5;
314   const DataRate kPacingRate =
315       DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond);
316   pacer.SetPacingRates(kPacingRate, DataRate::Zero());
317   pacer.EnsureStarted();
318 
319   // Send some initial packets to be rid of any probes.
320   EXPECT_CALL(packet_router, SendPacket).Times(kPacketsPerSecond);
321   pacer.EnqueuePackets(
322       GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond));
323   time_controller.AdvanceTime(TimeDelta::Seconds(1));
324 
325   // Insert three packets, and record send time of each of them.
326   // After the second packet is sent, double the send rate so we can
327   // check the third packets is sent after half the wait time.
328   Timestamp first_packet_time = Timestamp::MinusInfinity();
329   Timestamp second_packet_time = Timestamp::MinusInfinity();
330   Timestamp third_packet_time = Timestamp::MinusInfinity();
331 
332   EXPECT_CALL(packet_router, SendPacket)
333       .Times(3)
334       .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
335                           const PacedPacketInfo& cluster_info) {
336         if (first_packet_time.IsInfinite()) {
337           first_packet_time = time_controller.GetClock()->CurrentTime();
338         } else if (second_packet_time.IsInfinite()) {
339           second_packet_time = time_controller.GetClock()->CurrentTime();
340           // Avoid invoke SetPacingRate in the context of sending a packet.
341           time_controller.GetMainThread()->PostTask(
342               [&] { pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero()); });
343         } else {
344           third_packet_time = time_controller.GetClock()->CurrentTime();
345         }
346       });
347 
348   pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3));
349   time_controller.AdvanceTime(TimeDelta::Millis(500));
350   ASSERT_TRUE(third_packet_time.IsFinite());
351   EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0,
352               1.0);
353   EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
354               1.0);
355 }
356 
TEST_P(TaskQueuePacedSenderTest,SendsAudioImmediately)357 TEST_P(TaskQueuePacedSenderTest, SendsAudioImmediately) {
358   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
359   MockPacketRouter packet_router;
360   ScopedKeyValueConfig trials(GetParam());
361   TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
362                              time_controller.GetTaskQueueFactory(),
363                              PacingController::kMinSleepTime,
364                              TaskQueuePacedSender::kNoPacketHoldback);
365 
366   const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
367   const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
368   const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate;
369 
370   pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
371   pacer.EnsureStarted();
372 
373   // Add some initial video packets, only one should be sent.
374   EXPECT_CALL(packet_router, SendPacket);
375   pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
376   time_controller.AdvanceTime(TimeDelta::Zero());
377   ::testing::Mock::VerifyAndClearExpectations(&packet_router);
378 
379   // Advance time, but still before next packet should be sent.
380   time_controller.AdvanceTime(kPacketPacingTime / 2);
381 
382   // Insert an audio packet, it should be sent immediately.
383   EXPECT_CALL(packet_router, SendPacket);
384   pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1));
385   time_controller.AdvanceTime(TimeDelta::Zero());
386   ::testing::Mock::VerifyAndClearExpectations(&packet_router);
387 }
388 
TEST_P(TaskQueuePacedSenderTest,SleepsDuringCoalscingWindow)389 TEST_P(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
390   const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
391   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
392   MockPacketRouter packet_router;
393   ScopedKeyValueConfig trials(GetParam());
394   TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
395                              time_controller.GetTaskQueueFactory(),
396                              kCoalescingWindow,
397                              TaskQueuePacedSender::kNoPacketHoldback);
398 
399   // Set rates so one packet adds one ms of buffer level.
400   const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
401   const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
402   const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
403 
404   pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
405   pacer.EnsureStarted();
406 
407   // Add 10 packets. The first should be sent immediately since the buffers
408   // are clear.
409   EXPECT_CALL(packet_router, SendPacket);
410   pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
411   time_controller.AdvanceTime(TimeDelta::Zero());
412   ::testing::Mock::VerifyAndClearExpectations(&packet_router);
413 
414   // Advance time to 1ms before the coalescing window ends. No packets should
415   // be sent.
416   EXPECT_CALL(packet_router, SendPacket).Times(0);
417   time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
418 
419   // Advance time to where coalescing window ends. All packets that should
420   // have been sent up til now will be sent.
421   EXPECT_CALL(packet_router, SendPacket).Times(5);
422   time_controller.AdvanceTime(TimeDelta::Millis(1));
423   ::testing::Mock::VerifyAndClearExpectations(&packet_router);
424 }
425 
TEST_P(TaskQueuePacedSenderTest,ProbingOverridesCoalescingWindow)426 TEST_P(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
427   const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
428   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
429   MockPacketRouter packet_router;
430   ScopedKeyValueConfig trials(GetParam());
431   TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
432                              time_controller.GetTaskQueueFactory(),
433                              kCoalescingWindow,
434                              TaskQueuePacedSender::kNoPacketHoldback);
435 
436   // Set rates so one packet adds one ms of buffer level.
437   const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
438   const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
439   const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
440 
441   pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
442   pacer.EnsureStarted();
443 
444   // Add 10 packets. The first should be sent immediately since the buffers
445   // are clear. This will also trigger the probe to start.
446   EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
447   pacer.CreateProbeClusters(
448       {{.at_time = time_controller.GetClock()->CurrentTime(),
449         .target_data_rate = kPacingDataRate * 2,
450         .target_duration = TimeDelta::Millis(15),
451         .target_probe_count = 5,
452         .id = 17}});
453   pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
454   time_controller.AdvanceTime(TimeDelta::Zero());
455   ::testing::Mock::VerifyAndClearExpectations(&packet_router);
456 
457   // Advance time to 1ms before the coalescing window ends. Packets should be
458   // flying.
459   EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
460   time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
461 }
462 
TEST_P(TaskQueuePacedSenderTest,SchedulesProbeAtSentTime)463 TEST_P(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) {
464   ScopedKeyValueConfig trials(
465       GetParam() + "WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/");
466   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
467   MockPacketRouter packet_router;
468   TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
469                              time_controller.GetTaskQueueFactory(),
470                              PacingController::kMinSleepTime,
471                              TaskQueuePacedSender::kNoPacketHoldback);
472 
473   // Set rates so one packet adds 4ms of buffer level.
474   const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
475   const TimeDelta kPacketPacingTime = TimeDelta::Millis(4);
476   const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
477   pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero());
478   pacer.EnsureStarted();
479   EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
480     return std::vector<std::unique_ptr<RtpPacketToSend>>();
481   });
482   EXPECT_CALL(packet_router, GeneratePadding(_))
483       .WillRepeatedly(
484           [](DataSize target_size) { return GeneratePadding(target_size); });
485 
486   // Enqueue two packets, only the first is sent immediately and the next
487   // will be scheduled for sending in 4ms.
488   pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 2));
489   const int kNotAProbe = PacedPacketInfo::kNotAProbe;
490   EXPECT_CALL(packet_router,
491               SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
492                                              kNotAProbe)));
493   // Advance to less than 3ms before next packet send time.
494   time_controller.AdvanceTime(TimeDelta::Micros(1001));
495 
496   // Trigger a probe at 2x the current pacing rate and insert the number of
497   // packets the probe needs.
498   const DataRate kProbeRate = 2 * kPacingDataRate;
499   const int kProbeClusterId = 1;
500   pacer.CreateProbeClusters(
501       {{.at_time = time_controller.GetClock()->CurrentTime(),
502         .target_data_rate = kProbeRate,
503         .target_duration = TimeDelta::Millis(15),
504         .target_probe_count = 4,
505         .id = kProbeClusterId}});
506 
507   // Expected size for each probe in a cluster is twice the expected bits sent
508   // during min_probe_delta.
509   // Expect one additional call since probe always starts with a small (1 byte)
510   // padding packet that's not counted into the probe rate here.
511   const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2);
512   const DataSize kProbeSize = kProbeRate * kProbeTimeDelta;
513   const size_t kNumPacketsInProbe =
514       (kProbeSize + kPacketSize - DataSize::Bytes(1)) / kPacketSize;
515   EXPECT_CALL(packet_router,
516               SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
517                                              kProbeClusterId)))
518       .Times(kNumPacketsInProbe + 1);
519 
520   pacer.EnqueuePackets(
521       GeneratePackets(RtpPacketMediaType::kVideo, kNumPacketsInProbe));
522   time_controller.AdvanceTime(TimeDelta::Zero());
523 
524   // The pacer should have scheduled the next probe to be sent in
525   // kProbeTimeDelta. That there was existing scheduled call less than
526   // PacingController::kMinSleepTime before this should not matter.
527   EXPECT_CALL(packet_router,
528               SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
529                                              kProbeClusterId)))
530       .Times(AtLeast(1));
531   time_controller.AdvanceTime(TimeDelta::Millis(2));
532 }
533 
TEST_P(TaskQueuePacedSenderTest,NoMinSleepTimeWhenProbing)534 TEST_P(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
535   // Set min_probe_delta to be less than kMinSleepTime (1ms).
536   const TimeDelta kMinProbeDelta = TimeDelta::Micros(200);
537   ScopedKeyValueConfig trials(
538       GetParam() + "WebRTC-Bwe-ProbingBehavior/min_probe_delta:200us/");
539   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
540   MockPacketRouter packet_router;
541   TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
542                              time_controller.GetTaskQueueFactory(),
543                              PacingController::kMinSleepTime,
544                              TaskQueuePacedSender::kNoPacketHoldback);
545 
546   // Set rates so one packet adds 4ms of buffer level.
547   const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
548   const TimeDelta kPacketPacingTime = TimeDelta::Millis(4);
549   const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
550   pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero());
551   pacer.EnsureStarted();
552   EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
553     return std::vector<std::unique_ptr<RtpPacketToSend>>();
554   });
555   EXPECT_CALL(packet_router, GeneratePadding)
556       .WillRepeatedly(
557           [](DataSize target_size) { return GeneratePadding(target_size); });
558 
559   // Set a high probe rate.
560   const int kProbeClusterId = 1;
561   DataRate kProbingRate = kPacingDataRate * 10;
562 
563   pacer.CreateProbeClusters(
564       {{.at_time = time_controller.GetClock()->CurrentTime(),
565         .target_data_rate = kProbingRate,
566         .target_duration = TimeDelta::Millis(15),
567         .target_probe_count = 5,
568         .id = kProbeClusterId}});
569 
570   // Advance time less than PacingController::kMinSleepTime, probing packets
571   // for the first millisecond should be sent immediately. Min delta between
572   // probes is 200us, meaning 4 times per ms we will get least one call to
573   // SendPacket().
574   DataSize data_sent = DataSize::Zero();
575   EXPECT_CALL(packet_router,
576               SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
577                                              kProbeClusterId)))
578       .Times(AtLeast(4))
579       .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
580                           const PacedPacketInfo&) {
581         data_sent +=
582             DataSize::Bytes(packet->payload_size() + packet->padding_size());
583       });
584 
585   // Add one packet to kickstart probing, the rest will be padding packets.
586   pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
587   time_controller.AdvanceTime(kMinProbeDelta);
588 
589   // Verify the amount of probing data sent.
590   // Probe always starts with a small (1 byte) padding packet that's not
591   // counted into the probe rate here.
592   const DataSize kMinProbeSize = kMinProbeDelta * kProbingRate;
593   EXPECT_EQ(data_sent, DataSize::Bytes(1) + kPacketSize + 4 * kMinProbeSize);
594 }
595 
TEST_P(TaskQueuePacedSenderTest,PacketBasedCoalescing)596 TEST_P(TaskQueuePacedSenderTest, PacketBasedCoalescing) {
597   const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(10);
598   const int kPacketBasedHoldback = 5;
599 
600   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
601   MockPacketRouter packet_router;
602   ScopedKeyValueConfig trials(GetParam());
603   TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
604                              time_controller.GetTaskQueueFactory(),
605                              kFixedCoalescingWindow, kPacketBasedHoldback);
606 
607   // Set rates so one packet adds one ms of buffer level.
608   const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
609   const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
610   const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
611   const TimeDelta kExpectedHoldbackWindow =
612       kPacketPacingTime * kPacketBasedHoldback;
613   // `kFixedCoalescingWindow` sets the upper bound for the window.
614   ASSERT_GE(kFixedCoalescingWindow, kExpectedHoldbackWindow);
615 
616   pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
617   EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
618     return std::vector<std::unique_ptr<RtpPacketToSend>>();
619   });
620   pacer.EnsureStarted();
621 
622   // Add some packets and wait till all have been sent, so that the pacer
623   // has a valid estimate of packet size.
624   const int kNumWarmupPackets = 40;
625   EXPECT_CALL(packet_router, SendPacket).Times(kNumWarmupPackets);
626   pacer.EnqueuePackets(
627       GeneratePackets(RtpPacketMediaType::kVideo, kNumWarmupPackets));
628   // Wait until all packes have been sent, with a 2x margin.
629   time_controller.AdvanceTime(kPacketPacingTime * (kNumWarmupPackets * 2));
630 
631   // Enqueue packets. Expect only the first one to be sent immediately.
632   EXPECT_CALL(packet_router, SendPacket).Times(1);
633   pacer.EnqueuePackets(
634       GeneratePackets(RtpPacketMediaType::kVideo, kPacketBasedHoldback));
635   time_controller.AdvanceTime(TimeDelta::Zero());
636 
637   // Advance time to 1ms before the coalescing window ends.
638   EXPECT_CALL(packet_router, SendPacket).Times(0);
639   time_controller.AdvanceTime(kExpectedHoldbackWindow - TimeDelta::Millis(1));
640 
641   // Advance past where the coalescing window should end.
642   EXPECT_CALL(packet_router, SendPacket).Times(kPacketBasedHoldback - 1);
643   time_controller.AdvanceTime(TimeDelta::Millis(1));
644 }
645 
TEST_P(TaskQueuePacedSenderTest,FixedHoldBackHasPriorityOverPackets)646 TEST_P(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) {
647   const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(2);
648   const int kPacketBasedHoldback = 5;
649 
650   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
651   MockPacketRouter packet_router;
652   ScopedKeyValueConfig trials(GetParam());
653   TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
654                              time_controller.GetTaskQueueFactory(),
655                              kFixedCoalescingWindow, kPacketBasedHoldback);
656 
657   // Set rates so one packet adds one ms of buffer level.
658   const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
659   const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
660   const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
661   const TimeDelta kExpectedPacketHoldbackWindow =
662       kPacketPacingTime * kPacketBasedHoldback;
663   // |kFixedCoalescingWindow| sets the upper bound for the window.
664   ASSERT_LT(kFixedCoalescingWindow, kExpectedPacketHoldbackWindow);
665 
666   pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
667   EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
668     return std::vector<std::unique_ptr<RtpPacketToSend>>();
669   });
670   pacer.EnsureStarted();
671 
672   // Add some packets and wait till all have been sent, so that the pacer
673   // has a valid estimate of packet size.
674   const int kNumWarmupPackets = 40;
675   EXPECT_CALL(packet_router, SendPacket).Times(kNumWarmupPackets);
676   pacer.EnqueuePackets(
677       GeneratePackets(RtpPacketMediaType::kVideo, kNumWarmupPackets));
678   // Wait until all packes have been sent, with a 2x margin.
679   time_controller.AdvanceTime(kPacketPacingTime * (kNumWarmupPackets * 2));
680 
681   // Enqueue packets. Expect onlt the first one to be sent immediately.
682   EXPECT_CALL(packet_router, SendPacket).Times(1);
683   pacer.EnqueuePackets(
684       GeneratePackets(RtpPacketMediaType::kVideo, kPacketBasedHoldback));
685   time_controller.AdvanceTime(TimeDelta::Zero());
686 
687   // Advance time to the fixed coalescing window, that should take presedence so
688   // at least some of the packets should be sent.
689   EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
690   time_controller.AdvanceTime(kFixedCoalescingWindow);
691 }
692 
TEST_P(TaskQueuePacedSenderTest,ProbingStopDuringSendLoop)693 TEST_P(TaskQueuePacedSenderTest, ProbingStopDuringSendLoop) {
694   // Set a low `min_probe_delta` to let probing finish during send loop.
695   ScopedKeyValueConfig trials(
696       GetParam() + "WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/");
697 
698   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
699   MockPacketRouter packet_router;
700   TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
701                              time_controller.GetTaskQueueFactory(),
702                              PacingController::kMinSleepTime,
703                              TaskQueuePacedSender::kNoPacketHoldback);
704 
705   // Set rates so 2 packets adds 1ms of buffer level.
706   const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
707   const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
708   const DataRate kPacingDataRate = 2 * kPacketSize / kPacketPacingTime;
709 
710   pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
711   pacer.EnsureStarted();
712 
713   EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
714     return std::vector<std::unique_ptr<RtpPacketToSend>>();
715   });
716   EXPECT_CALL(packet_router, GeneratePadding(_))
717       .WillRepeatedly(
718           [](DataSize target_size) { return GeneratePadding(target_size); });
719 
720   // Set probe rate.
721   const int kProbeClusterId = 1;
722   const DataRate kProbingRate = kPacingDataRate;
723 
724   pacer.CreateProbeClusters(
725       {{.at_time = time_controller.GetClock()->CurrentTime(),
726         .target_data_rate = kProbingRate,
727         .target_duration = TimeDelta::Millis(15),
728         .target_probe_count = 4,
729         .id = kProbeClusterId}});
730 
731   const int kPacketsToSend = 100;
732   const TimeDelta kPacketsPacedTime =
733       std::max(kPacketsToSend * kPacketSize / kPacingDataRate,
734                kPacketsToSend * kPacketSize / kProbingRate);
735 
736   // Expect all packets and one padding packet sent.
737   EXPECT_CALL(packet_router, SendPacket).Times(kPacketsToSend + 1);
738   pacer.EnqueuePackets(
739       GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
740   time_controller.AdvanceTime(kPacketsPacedTime + TimeDelta::Millis(1));
741 }
742 
TEST_P(TaskQueuePacedSenderTest,Stats)743 TEST_P(TaskQueuePacedSenderTest, Stats) {
744   static constexpr Timestamp kStartTime = Timestamp::Millis(1234);
745   GlobalSimulatedTimeController time_controller(kStartTime);
746   MockPacketRouter packet_router;
747   ScopedKeyValueConfig trials(GetParam());
748   TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
749                              time_controller.GetTaskQueueFactory(),
750                              PacingController::kMinSleepTime,
751                              TaskQueuePacedSender::kNoPacketHoldback);
752 
753   // Simulate ~2mbps video stream, covering one second.
754   static constexpr size_t kPacketsToSend = 200;
755   static constexpr DataRate kPacingRate =
756       DataRate::BytesPerSec(kDefaultPacketSize * kPacketsToSend);
757   pacer.SetPacingRates(kPacingRate, DataRate::Zero());
758   pacer.EnsureStarted();
759 
760   // Allowed `QueueSizeData` and `ExpectedQueueTime` deviation.
761   static constexpr size_t kAllowedPacketsDeviation = 1;
762   static constexpr DataSize kAllowedQueueSizeDeviation =
763       DataSize::Bytes(kDefaultPacketSize * kAllowedPacketsDeviation);
764   static constexpr TimeDelta kAllowedQueueTimeDeviation =
765       kAllowedQueueSizeDeviation / kPacingRate;
766 
767   DataSize expected_queue_size = DataSize::MinusInfinity();
768   TimeDelta expected_queue_time = TimeDelta::MinusInfinity();
769 
770   EXPECT_CALL(packet_router, SendPacket).Times(kPacketsToSend);
771 
772   // Stats before insert any packets.
773   EXPECT_TRUE(pacer.OldestPacketWaitTime().IsZero());
774   EXPECT_FALSE(pacer.FirstSentPacketTime().has_value());
775   EXPECT_TRUE(pacer.QueueSizeData().IsZero());
776   EXPECT_TRUE(pacer.ExpectedQueueTime().IsZero());
777 
778   pacer.EnqueuePackets(
779       GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
780 
781   // Advance to 200ms.
782   time_controller.AdvanceTime(TimeDelta::Millis(200));
783   EXPECT_EQ(pacer.OldestPacketWaitTime(), TimeDelta::Millis(200));
784   EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime);
785 
786   expected_queue_size = kPacingRate * TimeDelta::Millis(800);
787   expected_queue_time = expected_queue_size / kPacingRate;
788   EXPECT_NEAR(pacer.QueueSizeData().bytes(), expected_queue_size.bytes(),
789               kAllowedQueueSizeDeviation.bytes());
790   EXPECT_NEAR(pacer.ExpectedQueueTime().ms(), expected_queue_time.ms(),
791               kAllowedQueueTimeDeviation.ms());
792 
793   // Advance to 500ms.
794   time_controller.AdvanceTime(TimeDelta::Millis(300));
795   EXPECT_EQ(pacer.OldestPacketWaitTime(), TimeDelta::Millis(500));
796   EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime);
797 
798   expected_queue_size = kPacingRate * TimeDelta::Millis(500);
799   expected_queue_time = expected_queue_size / kPacingRate;
800   EXPECT_NEAR(pacer.QueueSizeData().bytes(), expected_queue_size.bytes(),
801               kAllowedQueueSizeDeviation.bytes());
802   EXPECT_NEAR(pacer.ExpectedQueueTime().ms(), expected_queue_time.ms(),
803               kAllowedQueueTimeDeviation.ms());
804 
805   // Advance to 1000ms+, expect all packets to be sent.
806   time_controller.AdvanceTime(TimeDelta::Millis(500) +
807                               kAllowedQueueTimeDeviation);
808   EXPECT_TRUE(pacer.OldestPacketWaitTime().IsZero());
809   EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime);
810   EXPECT_TRUE(pacer.QueueSizeData().IsZero());
811   EXPECT_TRUE(pacer.ExpectedQueueTime().IsZero());
812 }
813 
814 // TODO(webrtc:14502): Rewrite these tests if the functionality is needed if
815 // pacing is done on the worker thread.
TEST(TaskQueuePacedSenderTest,HighPrecisionPacingWhenSlackIsDisabled)816 TEST(TaskQueuePacedSenderTest, HighPrecisionPacingWhenSlackIsDisabled) {
817   ScopedKeyValueConfig trials("WebRTC-SlackedTaskQueuePacedSender/Disabled/");
818 
819   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
820   TaskQueueWithFakePrecisionFactory task_queue_factory(
821       time_controller.GetTaskQueueFactory());
822 
823   MockPacketRouter packet_router;
824   TaskQueuePacedSender pacer(
825       time_controller.GetClock(), &packet_router, trials, &task_queue_factory,
826       PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
827 
828   // Send enough packets (covering one second) that pacing is triggered, i.e.
829   // delayed tasks being scheduled.
830   static constexpr size_t kPacketsToSend = 42;
831   static constexpr DataRate kPacingRate =
832       DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend);
833   pacer.SetPacingRates(kPacingRate, DataRate::Zero());
834   pacer.EnsureStarted();
835   pacer.EnqueuePackets(
836       GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
837   // Expect all of them to be sent.
838   size_t packets_sent = 0;
839   EXPECT_CALL(packet_router, SendPacket)
840       .WillRepeatedly(
841           [&](std::unique_ptr<RtpPacketToSend> packet,
842               const PacedPacketInfo& cluster_info) { ++packets_sent; });
843   time_controller.AdvanceTime(TimeDelta::Seconds(1));
844   EXPECT_EQ(packets_sent, kPacketsToSend);
845 
846   // Expect pacing to make use of high precision.
847   EXPECT_EQ(task_queue_factory.delayed_low_precision_count(), 0);
848   EXPECT_GT(task_queue_factory.delayed_high_precision_count(), 0);
849 
850   // Create probe cluster which is also high precision.
851   pacer.CreateProbeClusters(
852       {{.at_time = time_controller.GetClock()->CurrentTime(),
853         .target_data_rate = kPacingRate,
854         .target_duration = TimeDelta::Millis(15),
855         .target_probe_count = 4,
856         .id = 123}});
857   pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
858   time_controller.AdvanceTime(TimeDelta::Seconds(1));
859   EXPECT_EQ(task_queue_factory.delayed_low_precision_count(), 0);
860   EXPECT_GT(task_queue_factory.delayed_high_precision_count(), 0);
861 }
862 
863 // TODO(webrtc:14502): Rewrite these tests if the functionality is needed if
864 // pacing is done on the worker thread.
TEST(TaskQueuePacedSenderTest,LowPrecisionPacingWhenSlackIsEnabled)865 TEST(TaskQueuePacedSenderTest, LowPrecisionPacingWhenSlackIsEnabled) {
866   ScopedKeyValueConfig trials("WebRTC-SlackedTaskQueuePacedSender/Enabled/");
867 
868   GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
869   TaskQueueWithFakePrecisionFactory task_queue_factory(
870       time_controller.GetTaskQueueFactory());
871 
872   MockPacketRouter packet_router;
873   TaskQueuePacedSender pacer(
874       time_controller.GetClock(), &packet_router, trials, &task_queue_factory,
875       PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
876 
877   // Send enough packets (covering one second) that pacing is triggered, i.e.
878   // delayed tasks being scheduled.
879   static constexpr size_t kPacketsToSend = 42;
880   static constexpr DataRate kPacingRate =
881       DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend);
882   pacer.SetPacingRates(kPacingRate, DataRate::Zero());
883   pacer.EnsureStarted();
884   pacer.EnqueuePackets(
885       GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
886   // Expect all of them to be sent.
887   size_t packets_sent = 0;
888   EXPECT_CALL(packet_router, SendPacket)
889       .WillRepeatedly(
890           [&](std::unique_ptr<RtpPacketToSend> packet,
891               const PacedPacketInfo& cluster_info) { ++packets_sent; });
892   time_controller.AdvanceTime(TimeDelta::Seconds(1));
893   EXPECT_EQ(packets_sent, kPacketsToSend);
894 
895   // Expect pacing to make use of low precision.
896   EXPECT_GT(task_queue_factory.delayed_low_precision_count(), 0);
897   EXPECT_EQ(task_queue_factory.delayed_high_precision_count(), 0);
898 
899   // Create probe cluster, which uses high precision despite regular pacing
900   // being low precision.
901   pacer.CreateProbeClusters(
902       {{.at_time = time_controller.GetClock()->CurrentTime(),
903         .target_data_rate = kPacingRate,
904         .target_duration = TimeDelta::Millis(15),
905         .target_probe_count = 4,
906         .id = 123}});
907   pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
908   time_controller.AdvanceTime(TimeDelta::Seconds(1));
909   EXPECT_GT(task_queue_factory.delayed_high_precision_count(), 0);
910 }
911 
912 }  // namespace test
913 }  // namespace webrtc
914