1 /*
2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/pacing/task_queue_paced_sender.h"
12
13 #include <algorithm>
14 #include <atomic>
15 #include <list>
16 #include <memory>
17 #include <string>
18 #include <utility>
19 #include <vector>
20
21 #include "absl/functional/any_invocable.h"
22 #include "api/task_queue/task_queue_base.h"
23 #include "api/transport/network_types.h"
24 #include "api/units/data_rate.h"
25 #include "modules/pacing/packet_router.h"
26 #include "test/gmock.h"
27 #include "test/gtest.h"
28 #include "test/scoped_key_value_config.h"
29 #include "test/time_controller/simulated_time_controller.h"
30
31 using ::testing::_;
32 using ::testing::AtLeast;
33 using ::testing::Return;
34 using ::testing::SaveArg;
35
36 namespace webrtc {
37 namespace {
38 constexpr uint32_t kAudioSsrc = 12345;
39 constexpr uint32_t kVideoSsrc = 234565;
40 constexpr uint32_t kVideoRtxSsrc = 34567;
41 constexpr uint32_t kFlexFecSsrc = 45678;
42 constexpr size_t kDefaultPacketSize = 1234;
43
44 class MockPacketRouter : public PacketRouter {
45 public:
46 MOCK_METHOD(void,
47 SendPacket,
48 (std::unique_ptr<RtpPacketToSend> packet,
49 const PacedPacketInfo& cluster_info),
50 (override));
51 MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>,
52 FetchFec,
53 (),
54 (override));
55 MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>,
56 GeneratePadding,
57 (DataSize target_size),
58 (override));
59 };
60
GeneratePadding(DataSize target_size)61 std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
62 DataSize target_size) {
63 // 224 bytes is the max padding size for plain padding packets generated by
64 // RTPSender::GeneratePadding().
65 const DataSize kMaxPaddingPacketSize = DataSize::Bytes(224);
66 DataSize padding_generated = DataSize::Zero();
67 std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets;
68 while (padding_generated < target_size) {
69 DataSize packet_size =
70 std::min(target_size - padding_generated, kMaxPaddingPacketSize);
71 padding_generated += packet_size;
72 auto padding_packet =
73 std::make_unique<RtpPacketToSend>(/*extensions=*/nullptr);
74 padding_packet->set_packet_type(RtpPacketMediaType::kPadding);
75 padding_packet->SetPadding(packet_size.bytes());
76 padding_packets.push_back(std::move(padding_packet));
77 }
78 return padding_packets;
79 }
80
81 class TaskQueueWithFakePrecisionFactory : public TaskQueueFactory {
82 public:
TaskQueueWithFakePrecisionFactory(TaskQueueFactory * task_queue_factory)83 explicit TaskQueueWithFakePrecisionFactory(
84 TaskQueueFactory* task_queue_factory)
85 : task_queue_factory_(task_queue_factory) {}
86
CreateTaskQueue(absl::string_view name,Priority priority) const87 std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
88 absl::string_view name,
89 Priority priority) const override {
90 return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
91 new TaskQueueWithFakePrecision(
92 const_cast<TaskQueueWithFakePrecisionFactory*>(this),
93 task_queue_factory_));
94 }
95
delayed_low_precision_count() const96 int delayed_low_precision_count() const {
97 return delayed_low_precision_count_;
98 }
delayed_high_precision_count() const99 int delayed_high_precision_count() const {
100 return delayed_high_precision_count_;
101 }
102
103 private:
104 friend class TaskQueueWithFakePrecision;
105
106 class TaskQueueWithFakePrecision : public TaskQueueBase {
107 public:
TaskQueueWithFakePrecision(TaskQueueWithFakePrecisionFactory * parent_factory,TaskQueueFactory * task_queue_factory)108 TaskQueueWithFakePrecision(
109 TaskQueueWithFakePrecisionFactory* parent_factory,
110 TaskQueueFactory* task_queue_factory)
111 : parent_factory_(parent_factory),
112 task_queue_(task_queue_factory->CreateTaskQueue(
113 "TaskQueueWithFakePrecision",
114 TaskQueueFactory::Priority::NORMAL)) {}
~TaskQueueWithFakePrecision()115 ~TaskQueueWithFakePrecision() override {}
116
Delete()117 void Delete() override {
118 // `task_queue_->Delete()` is implicitly called in the destructor due to
119 // TaskQueueDeleter.
120 delete this;
121 }
PostTask(absl::AnyInvocable<void ()&&> task)122 void PostTask(absl::AnyInvocable<void() &&> task) override {
123 task_queue_->PostTask(WrapTask(std::move(task)));
124 }
PostDelayedTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)125 void PostDelayedTask(absl::AnyInvocable<void() &&> task,
126 TimeDelta delay) override {
127 ++parent_factory_->delayed_low_precision_count_;
128 task_queue_->PostDelayedTask(WrapTask(std::move(task)), delay);
129 }
PostDelayedHighPrecisionTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)130 void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
131 TimeDelta delay) override {
132 ++parent_factory_->delayed_high_precision_count_;
133 task_queue_->PostDelayedHighPrecisionTask(WrapTask(std::move(task)),
134 delay);
135 }
136
137 private:
WrapTask(absl::AnyInvocable<void ()&&> task)138 absl::AnyInvocable<void() &&> WrapTask(absl::AnyInvocable<void() &&> task) {
139 return [this, task = std::move(task)]() mutable {
140 CurrentTaskQueueSetter set_current(this);
141 std::move(task)();
142 };
143 }
144
145 TaskQueueWithFakePrecisionFactory* parent_factory_;
146 std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue_;
147 };
148
149 TaskQueueFactory* task_queue_factory_;
150 std::atomic<int> delayed_low_precision_count_ = 0u;
151 std::atomic<int> delayed_high_precision_count_ = 0u;
152 };
153
154 } // namespace
155
156 namespace test {
157
BuildRtpPacket(RtpPacketMediaType type)158 std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketMediaType type) {
159 auto packet = std::make_unique<RtpPacketToSend>(nullptr);
160 packet->set_packet_type(type);
161 switch (type) {
162 case RtpPacketMediaType::kAudio:
163 packet->SetSsrc(kAudioSsrc);
164 break;
165 case RtpPacketMediaType::kVideo:
166 packet->SetSsrc(kVideoSsrc);
167 break;
168 case RtpPacketMediaType::kRetransmission:
169 case RtpPacketMediaType::kPadding:
170 packet->SetSsrc(kVideoRtxSsrc);
171 break;
172 case RtpPacketMediaType::kForwardErrorCorrection:
173 packet->SetSsrc(kFlexFecSsrc);
174 break;
175 }
176
177 packet->SetPayloadSize(kDefaultPacketSize);
178 return packet;
179 }
180
GeneratePackets(RtpPacketMediaType type,size_t num_packets)181 std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
182 RtpPacketMediaType type,
183 size_t num_packets) {
184 std::vector<std::unique_ptr<RtpPacketToSend>> packets;
185 for (size_t i = 0; i < num_packets; ++i) {
186 packets.push_back(BuildRtpPacket(type));
187 }
188 return packets;
189 }
190
191 constexpr char kSendPacketOnWorkerThreadFieldTrial[] =
192 "WebRTC-SendPacketsOnWorkerThread/Enabled/";
193
ParameterizedFieldTrials()194 std::vector<std::string> ParameterizedFieldTrials() {
195 return {{""}, {kSendPacketOnWorkerThreadFieldTrial}};
196 }
197
UsingWorkerThread(absl::string_view field_trials)198 bool UsingWorkerThread(absl::string_view field_trials) {
199 return field_trials.find(kSendPacketOnWorkerThreadFieldTrial) !=
200 std::string::npos;
201 }
202
203 class TaskQueuePacedSenderTest
204 : public ::testing::TestWithParam<std::string /*field_trials*/> {};
205
206 INSTANTIATE_TEST_SUITE_P(TaskQueuePacedSenderTest,
207 TaskQueuePacedSenderTest,
208 testing::ValuesIn(ParameterizedFieldTrials()),
__anon6bcb728c0302(const testing::TestParamInfo<std::string>& info) 209 [](const testing::TestParamInfo<std::string>& info) {
210 return UsingWorkerThread(info.param) ? "UsingWt"
211 : "OwnedTQ";
212 });
213
TEST_P(TaskQueuePacedSenderTest,PacesPackets)214 TEST_P(TaskQueuePacedSenderTest, PacesPackets) {
215 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
216 MockPacketRouter packet_router;
217 ScopedKeyValueConfig trials(GetParam());
218 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
219 time_controller.GetTaskQueueFactory(),
220 PacingController::kMinSleepTime,
221 TaskQueuePacedSender::kNoPacketHoldback);
222
223 // Insert a number of packets, covering one second.
224 static constexpr size_t kPacketsToSend = 42;
225 SequenceChecker sequence_checker;
226 pacer.SetPacingRates(
227 DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
228 DataRate::Zero());
229 pacer.EnsureStarted();
230 pacer.EnqueuePackets(
231 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
232
233 // Expect all of them to be sent.
234 size_t packets_sent = 0;
235 Timestamp end_time = Timestamp::PlusInfinity();
236 EXPECT_CALL(packet_router, SendPacket)
237 .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
238 const PacedPacketInfo& cluster_info) {
239 ++packets_sent;
240 if (packets_sent == kPacketsToSend) {
241 end_time = time_controller.GetClock()->CurrentTime();
242 }
243 EXPECT_EQ(sequence_checker.IsCurrent(), UsingWorkerThread(GetParam()));
244 });
245
246 const Timestamp start_time = time_controller.GetClock()->CurrentTime();
247
248 // Packets should be sent over a period of close to 1s. Expect a little
249 // lower than this since initial probing is a bit quicker.
250 time_controller.AdvanceTime(TimeDelta::Seconds(1));
251 EXPECT_EQ(packets_sent, kPacketsToSend);
252 ASSERT_TRUE(end_time.IsFinite());
253 EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
254 }
255
256 // Same test as above, but with 0.5s of burst applied.
TEST_P(TaskQueuePacedSenderTest,PacesPacketsWithBurst)257 TEST_P(TaskQueuePacedSenderTest, PacesPacketsWithBurst) {
258 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
259 MockPacketRouter packet_router;
260 ScopedKeyValueConfig trials(GetParam());
261 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
262 time_controller.GetTaskQueueFactory(),
263 PacingController::kMinSleepTime,
264 TaskQueuePacedSender::kNoPacketHoldback,
265 // Half a second of bursting.
266 TimeDelta::Seconds(0.5));
267
268 // Insert a number of packets, covering one second.
269 static constexpr size_t kPacketsToSend = 42;
270 SequenceChecker sequence_checker;
271 pacer.SetPacingRates(
272 DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
273 DataRate::Zero());
274 pacer.EnsureStarted();
275 pacer.EnqueuePackets(
276 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
277
278 // Expect all of them to be sent.
279 size_t packets_sent = 0;
280 Timestamp end_time = Timestamp::PlusInfinity();
281 EXPECT_CALL(packet_router, SendPacket)
282 .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
283 const PacedPacketInfo& cluster_info) {
284 ++packets_sent;
285 if (packets_sent == kPacketsToSend) {
286 end_time = time_controller.GetClock()->CurrentTime();
287 }
288 EXPECT_EQ(sequence_checker.IsCurrent(), UsingWorkerThread(GetParam()));
289 });
290
291 const Timestamp start_time = time_controller.GetClock()->CurrentTime();
292
293 // Packets should be sent over a period of close to 1s. Expect a little
294 // lower than this since initial probing is a bit quicker.
295 time_controller.AdvanceTime(TimeDelta::Seconds(1));
296 EXPECT_EQ(packets_sent, kPacketsToSend);
297 ASSERT_TRUE(end_time.IsFinite());
298 // Because of half a second of burst, what would normally have been paced over
299 // ~1 second now takes ~0.5 seconds.
300 EXPECT_NEAR((end_time - start_time).ms<double>(), 500.0, 50.0);
301 }
302
TEST_P(TaskQueuePacedSenderTest,ReschedulesProcessOnRateChange)303 TEST_P(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
304 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
305 MockPacketRouter packet_router;
306 ScopedKeyValueConfig trials(GetParam());
307 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
308 time_controller.GetTaskQueueFactory(),
309 PacingController::kMinSleepTime,
310 TaskQueuePacedSender::kNoPacketHoldback);
311
312 // Insert a number of packets to be sent 200ms apart.
313 const size_t kPacketsPerSecond = 5;
314 const DataRate kPacingRate =
315 DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond);
316 pacer.SetPacingRates(kPacingRate, DataRate::Zero());
317 pacer.EnsureStarted();
318
319 // Send some initial packets to be rid of any probes.
320 EXPECT_CALL(packet_router, SendPacket).Times(kPacketsPerSecond);
321 pacer.EnqueuePackets(
322 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond));
323 time_controller.AdvanceTime(TimeDelta::Seconds(1));
324
325 // Insert three packets, and record send time of each of them.
326 // After the second packet is sent, double the send rate so we can
327 // check the third packets is sent after half the wait time.
328 Timestamp first_packet_time = Timestamp::MinusInfinity();
329 Timestamp second_packet_time = Timestamp::MinusInfinity();
330 Timestamp third_packet_time = Timestamp::MinusInfinity();
331
332 EXPECT_CALL(packet_router, SendPacket)
333 .Times(3)
334 .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
335 const PacedPacketInfo& cluster_info) {
336 if (first_packet_time.IsInfinite()) {
337 first_packet_time = time_controller.GetClock()->CurrentTime();
338 } else if (second_packet_time.IsInfinite()) {
339 second_packet_time = time_controller.GetClock()->CurrentTime();
340 // Avoid invoke SetPacingRate in the context of sending a packet.
341 time_controller.GetMainThread()->PostTask(
342 [&] { pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero()); });
343 } else {
344 third_packet_time = time_controller.GetClock()->CurrentTime();
345 }
346 });
347
348 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3));
349 time_controller.AdvanceTime(TimeDelta::Millis(500));
350 ASSERT_TRUE(third_packet_time.IsFinite());
351 EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0,
352 1.0);
353 EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
354 1.0);
355 }
356
TEST_P(TaskQueuePacedSenderTest,SendsAudioImmediately)357 TEST_P(TaskQueuePacedSenderTest, SendsAudioImmediately) {
358 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
359 MockPacketRouter packet_router;
360 ScopedKeyValueConfig trials(GetParam());
361 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
362 time_controller.GetTaskQueueFactory(),
363 PacingController::kMinSleepTime,
364 TaskQueuePacedSender::kNoPacketHoldback);
365
366 const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
367 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
368 const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate;
369
370 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
371 pacer.EnsureStarted();
372
373 // Add some initial video packets, only one should be sent.
374 EXPECT_CALL(packet_router, SendPacket);
375 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
376 time_controller.AdvanceTime(TimeDelta::Zero());
377 ::testing::Mock::VerifyAndClearExpectations(&packet_router);
378
379 // Advance time, but still before next packet should be sent.
380 time_controller.AdvanceTime(kPacketPacingTime / 2);
381
382 // Insert an audio packet, it should be sent immediately.
383 EXPECT_CALL(packet_router, SendPacket);
384 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1));
385 time_controller.AdvanceTime(TimeDelta::Zero());
386 ::testing::Mock::VerifyAndClearExpectations(&packet_router);
387 }
388
TEST_P(TaskQueuePacedSenderTest,SleepsDuringCoalscingWindow)389 TEST_P(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
390 const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
391 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
392 MockPacketRouter packet_router;
393 ScopedKeyValueConfig trials(GetParam());
394 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
395 time_controller.GetTaskQueueFactory(),
396 kCoalescingWindow,
397 TaskQueuePacedSender::kNoPacketHoldback);
398
399 // Set rates so one packet adds one ms of buffer level.
400 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
401 const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
402 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
403
404 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
405 pacer.EnsureStarted();
406
407 // Add 10 packets. The first should be sent immediately since the buffers
408 // are clear.
409 EXPECT_CALL(packet_router, SendPacket);
410 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
411 time_controller.AdvanceTime(TimeDelta::Zero());
412 ::testing::Mock::VerifyAndClearExpectations(&packet_router);
413
414 // Advance time to 1ms before the coalescing window ends. No packets should
415 // be sent.
416 EXPECT_CALL(packet_router, SendPacket).Times(0);
417 time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
418
419 // Advance time to where coalescing window ends. All packets that should
420 // have been sent up til now will be sent.
421 EXPECT_CALL(packet_router, SendPacket).Times(5);
422 time_controller.AdvanceTime(TimeDelta::Millis(1));
423 ::testing::Mock::VerifyAndClearExpectations(&packet_router);
424 }
425
TEST_P(TaskQueuePacedSenderTest,ProbingOverridesCoalescingWindow)426 TEST_P(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
427 const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
428 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
429 MockPacketRouter packet_router;
430 ScopedKeyValueConfig trials(GetParam());
431 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
432 time_controller.GetTaskQueueFactory(),
433 kCoalescingWindow,
434 TaskQueuePacedSender::kNoPacketHoldback);
435
436 // Set rates so one packet adds one ms of buffer level.
437 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
438 const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
439 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
440
441 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
442 pacer.EnsureStarted();
443
444 // Add 10 packets. The first should be sent immediately since the buffers
445 // are clear. This will also trigger the probe to start.
446 EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
447 pacer.CreateProbeClusters(
448 {{.at_time = time_controller.GetClock()->CurrentTime(),
449 .target_data_rate = kPacingDataRate * 2,
450 .target_duration = TimeDelta::Millis(15),
451 .target_probe_count = 5,
452 .id = 17}});
453 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
454 time_controller.AdvanceTime(TimeDelta::Zero());
455 ::testing::Mock::VerifyAndClearExpectations(&packet_router);
456
457 // Advance time to 1ms before the coalescing window ends. Packets should be
458 // flying.
459 EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
460 time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
461 }
462
TEST_P(TaskQueuePacedSenderTest,SchedulesProbeAtSentTime)463 TEST_P(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) {
464 ScopedKeyValueConfig trials(
465 GetParam() + "WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/");
466 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
467 MockPacketRouter packet_router;
468 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
469 time_controller.GetTaskQueueFactory(),
470 PacingController::kMinSleepTime,
471 TaskQueuePacedSender::kNoPacketHoldback);
472
473 // Set rates so one packet adds 4ms of buffer level.
474 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
475 const TimeDelta kPacketPacingTime = TimeDelta::Millis(4);
476 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
477 pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero());
478 pacer.EnsureStarted();
479 EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
480 return std::vector<std::unique_ptr<RtpPacketToSend>>();
481 });
482 EXPECT_CALL(packet_router, GeneratePadding(_))
483 .WillRepeatedly(
484 [](DataSize target_size) { return GeneratePadding(target_size); });
485
486 // Enqueue two packets, only the first is sent immediately and the next
487 // will be scheduled for sending in 4ms.
488 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 2));
489 const int kNotAProbe = PacedPacketInfo::kNotAProbe;
490 EXPECT_CALL(packet_router,
491 SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
492 kNotAProbe)));
493 // Advance to less than 3ms before next packet send time.
494 time_controller.AdvanceTime(TimeDelta::Micros(1001));
495
496 // Trigger a probe at 2x the current pacing rate and insert the number of
497 // packets the probe needs.
498 const DataRate kProbeRate = 2 * kPacingDataRate;
499 const int kProbeClusterId = 1;
500 pacer.CreateProbeClusters(
501 {{.at_time = time_controller.GetClock()->CurrentTime(),
502 .target_data_rate = kProbeRate,
503 .target_duration = TimeDelta::Millis(15),
504 .target_probe_count = 4,
505 .id = kProbeClusterId}});
506
507 // Expected size for each probe in a cluster is twice the expected bits sent
508 // during min_probe_delta.
509 // Expect one additional call since probe always starts with a small (1 byte)
510 // padding packet that's not counted into the probe rate here.
511 const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2);
512 const DataSize kProbeSize = kProbeRate * kProbeTimeDelta;
513 const size_t kNumPacketsInProbe =
514 (kProbeSize + kPacketSize - DataSize::Bytes(1)) / kPacketSize;
515 EXPECT_CALL(packet_router,
516 SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
517 kProbeClusterId)))
518 .Times(kNumPacketsInProbe + 1);
519
520 pacer.EnqueuePackets(
521 GeneratePackets(RtpPacketMediaType::kVideo, kNumPacketsInProbe));
522 time_controller.AdvanceTime(TimeDelta::Zero());
523
524 // The pacer should have scheduled the next probe to be sent in
525 // kProbeTimeDelta. That there was existing scheduled call less than
526 // PacingController::kMinSleepTime before this should not matter.
527 EXPECT_CALL(packet_router,
528 SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
529 kProbeClusterId)))
530 .Times(AtLeast(1));
531 time_controller.AdvanceTime(TimeDelta::Millis(2));
532 }
533
TEST_P(TaskQueuePacedSenderTest,NoMinSleepTimeWhenProbing)534 TEST_P(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
535 // Set min_probe_delta to be less than kMinSleepTime (1ms).
536 const TimeDelta kMinProbeDelta = TimeDelta::Micros(200);
537 ScopedKeyValueConfig trials(
538 GetParam() + "WebRTC-Bwe-ProbingBehavior/min_probe_delta:200us/");
539 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
540 MockPacketRouter packet_router;
541 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
542 time_controller.GetTaskQueueFactory(),
543 PacingController::kMinSleepTime,
544 TaskQueuePacedSender::kNoPacketHoldback);
545
546 // Set rates so one packet adds 4ms of buffer level.
547 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
548 const TimeDelta kPacketPacingTime = TimeDelta::Millis(4);
549 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
550 pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero());
551 pacer.EnsureStarted();
552 EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
553 return std::vector<std::unique_ptr<RtpPacketToSend>>();
554 });
555 EXPECT_CALL(packet_router, GeneratePadding)
556 .WillRepeatedly(
557 [](DataSize target_size) { return GeneratePadding(target_size); });
558
559 // Set a high probe rate.
560 const int kProbeClusterId = 1;
561 DataRate kProbingRate = kPacingDataRate * 10;
562
563 pacer.CreateProbeClusters(
564 {{.at_time = time_controller.GetClock()->CurrentTime(),
565 .target_data_rate = kProbingRate,
566 .target_duration = TimeDelta::Millis(15),
567 .target_probe_count = 5,
568 .id = kProbeClusterId}});
569
570 // Advance time less than PacingController::kMinSleepTime, probing packets
571 // for the first millisecond should be sent immediately. Min delta between
572 // probes is 200us, meaning 4 times per ms we will get least one call to
573 // SendPacket().
574 DataSize data_sent = DataSize::Zero();
575 EXPECT_CALL(packet_router,
576 SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
577 kProbeClusterId)))
578 .Times(AtLeast(4))
579 .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
580 const PacedPacketInfo&) {
581 data_sent +=
582 DataSize::Bytes(packet->payload_size() + packet->padding_size());
583 });
584
585 // Add one packet to kickstart probing, the rest will be padding packets.
586 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
587 time_controller.AdvanceTime(kMinProbeDelta);
588
589 // Verify the amount of probing data sent.
590 // Probe always starts with a small (1 byte) padding packet that's not
591 // counted into the probe rate here.
592 const DataSize kMinProbeSize = kMinProbeDelta * kProbingRate;
593 EXPECT_EQ(data_sent, DataSize::Bytes(1) + kPacketSize + 4 * kMinProbeSize);
594 }
595
TEST_P(TaskQueuePacedSenderTest,PacketBasedCoalescing)596 TEST_P(TaskQueuePacedSenderTest, PacketBasedCoalescing) {
597 const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(10);
598 const int kPacketBasedHoldback = 5;
599
600 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
601 MockPacketRouter packet_router;
602 ScopedKeyValueConfig trials(GetParam());
603 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
604 time_controller.GetTaskQueueFactory(),
605 kFixedCoalescingWindow, kPacketBasedHoldback);
606
607 // Set rates so one packet adds one ms of buffer level.
608 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
609 const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
610 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
611 const TimeDelta kExpectedHoldbackWindow =
612 kPacketPacingTime * kPacketBasedHoldback;
613 // `kFixedCoalescingWindow` sets the upper bound for the window.
614 ASSERT_GE(kFixedCoalescingWindow, kExpectedHoldbackWindow);
615
616 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
617 EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
618 return std::vector<std::unique_ptr<RtpPacketToSend>>();
619 });
620 pacer.EnsureStarted();
621
622 // Add some packets and wait till all have been sent, so that the pacer
623 // has a valid estimate of packet size.
624 const int kNumWarmupPackets = 40;
625 EXPECT_CALL(packet_router, SendPacket).Times(kNumWarmupPackets);
626 pacer.EnqueuePackets(
627 GeneratePackets(RtpPacketMediaType::kVideo, kNumWarmupPackets));
628 // Wait until all packes have been sent, with a 2x margin.
629 time_controller.AdvanceTime(kPacketPacingTime * (kNumWarmupPackets * 2));
630
631 // Enqueue packets. Expect only the first one to be sent immediately.
632 EXPECT_CALL(packet_router, SendPacket).Times(1);
633 pacer.EnqueuePackets(
634 GeneratePackets(RtpPacketMediaType::kVideo, kPacketBasedHoldback));
635 time_controller.AdvanceTime(TimeDelta::Zero());
636
637 // Advance time to 1ms before the coalescing window ends.
638 EXPECT_CALL(packet_router, SendPacket).Times(0);
639 time_controller.AdvanceTime(kExpectedHoldbackWindow - TimeDelta::Millis(1));
640
641 // Advance past where the coalescing window should end.
642 EXPECT_CALL(packet_router, SendPacket).Times(kPacketBasedHoldback - 1);
643 time_controller.AdvanceTime(TimeDelta::Millis(1));
644 }
645
TEST_P(TaskQueuePacedSenderTest,FixedHoldBackHasPriorityOverPackets)646 TEST_P(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) {
647 const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(2);
648 const int kPacketBasedHoldback = 5;
649
650 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
651 MockPacketRouter packet_router;
652 ScopedKeyValueConfig trials(GetParam());
653 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
654 time_controller.GetTaskQueueFactory(),
655 kFixedCoalescingWindow, kPacketBasedHoldback);
656
657 // Set rates so one packet adds one ms of buffer level.
658 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
659 const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
660 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
661 const TimeDelta kExpectedPacketHoldbackWindow =
662 kPacketPacingTime * kPacketBasedHoldback;
663 // |kFixedCoalescingWindow| sets the upper bound for the window.
664 ASSERT_LT(kFixedCoalescingWindow, kExpectedPacketHoldbackWindow);
665
666 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
667 EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
668 return std::vector<std::unique_ptr<RtpPacketToSend>>();
669 });
670 pacer.EnsureStarted();
671
672 // Add some packets and wait till all have been sent, so that the pacer
673 // has a valid estimate of packet size.
674 const int kNumWarmupPackets = 40;
675 EXPECT_CALL(packet_router, SendPacket).Times(kNumWarmupPackets);
676 pacer.EnqueuePackets(
677 GeneratePackets(RtpPacketMediaType::kVideo, kNumWarmupPackets));
678 // Wait until all packes have been sent, with a 2x margin.
679 time_controller.AdvanceTime(kPacketPacingTime * (kNumWarmupPackets * 2));
680
681 // Enqueue packets. Expect onlt the first one to be sent immediately.
682 EXPECT_CALL(packet_router, SendPacket).Times(1);
683 pacer.EnqueuePackets(
684 GeneratePackets(RtpPacketMediaType::kVideo, kPacketBasedHoldback));
685 time_controller.AdvanceTime(TimeDelta::Zero());
686
687 // Advance time to the fixed coalescing window, that should take presedence so
688 // at least some of the packets should be sent.
689 EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
690 time_controller.AdvanceTime(kFixedCoalescingWindow);
691 }
692
TEST_P(TaskQueuePacedSenderTest,ProbingStopDuringSendLoop)693 TEST_P(TaskQueuePacedSenderTest, ProbingStopDuringSendLoop) {
694 // Set a low `min_probe_delta` to let probing finish during send loop.
695 ScopedKeyValueConfig trials(
696 GetParam() + "WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/");
697
698 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
699 MockPacketRouter packet_router;
700 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
701 time_controller.GetTaskQueueFactory(),
702 PacingController::kMinSleepTime,
703 TaskQueuePacedSender::kNoPacketHoldback);
704
705 // Set rates so 2 packets adds 1ms of buffer level.
706 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
707 const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
708 const DataRate kPacingDataRate = 2 * kPacketSize / kPacketPacingTime;
709
710 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
711 pacer.EnsureStarted();
712
713 EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
714 return std::vector<std::unique_ptr<RtpPacketToSend>>();
715 });
716 EXPECT_CALL(packet_router, GeneratePadding(_))
717 .WillRepeatedly(
718 [](DataSize target_size) { return GeneratePadding(target_size); });
719
720 // Set probe rate.
721 const int kProbeClusterId = 1;
722 const DataRate kProbingRate = kPacingDataRate;
723
724 pacer.CreateProbeClusters(
725 {{.at_time = time_controller.GetClock()->CurrentTime(),
726 .target_data_rate = kProbingRate,
727 .target_duration = TimeDelta::Millis(15),
728 .target_probe_count = 4,
729 .id = kProbeClusterId}});
730
731 const int kPacketsToSend = 100;
732 const TimeDelta kPacketsPacedTime =
733 std::max(kPacketsToSend * kPacketSize / kPacingDataRate,
734 kPacketsToSend * kPacketSize / kProbingRate);
735
736 // Expect all packets and one padding packet sent.
737 EXPECT_CALL(packet_router, SendPacket).Times(kPacketsToSend + 1);
738 pacer.EnqueuePackets(
739 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
740 time_controller.AdvanceTime(kPacketsPacedTime + TimeDelta::Millis(1));
741 }
742
TEST_P(TaskQueuePacedSenderTest,Stats)743 TEST_P(TaskQueuePacedSenderTest, Stats) {
744 static constexpr Timestamp kStartTime = Timestamp::Millis(1234);
745 GlobalSimulatedTimeController time_controller(kStartTime);
746 MockPacketRouter packet_router;
747 ScopedKeyValueConfig trials(GetParam());
748 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
749 time_controller.GetTaskQueueFactory(),
750 PacingController::kMinSleepTime,
751 TaskQueuePacedSender::kNoPacketHoldback);
752
753 // Simulate ~2mbps video stream, covering one second.
754 static constexpr size_t kPacketsToSend = 200;
755 static constexpr DataRate kPacingRate =
756 DataRate::BytesPerSec(kDefaultPacketSize * kPacketsToSend);
757 pacer.SetPacingRates(kPacingRate, DataRate::Zero());
758 pacer.EnsureStarted();
759
760 // Allowed `QueueSizeData` and `ExpectedQueueTime` deviation.
761 static constexpr size_t kAllowedPacketsDeviation = 1;
762 static constexpr DataSize kAllowedQueueSizeDeviation =
763 DataSize::Bytes(kDefaultPacketSize * kAllowedPacketsDeviation);
764 static constexpr TimeDelta kAllowedQueueTimeDeviation =
765 kAllowedQueueSizeDeviation / kPacingRate;
766
767 DataSize expected_queue_size = DataSize::MinusInfinity();
768 TimeDelta expected_queue_time = TimeDelta::MinusInfinity();
769
770 EXPECT_CALL(packet_router, SendPacket).Times(kPacketsToSend);
771
772 // Stats before insert any packets.
773 EXPECT_TRUE(pacer.OldestPacketWaitTime().IsZero());
774 EXPECT_FALSE(pacer.FirstSentPacketTime().has_value());
775 EXPECT_TRUE(pacer.QueueSizeData().IsZero());
776 EXPECT_TRUE(pacer.ExpectedQueueTime().IsZero());
777
778 pacer.EnqueuePackets(
779 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
780
781 // Advance to 200ms.
782 time_controller.AdvanceTime(TimeDelta::Millis(200));
783 EXPECT_EQ(pacer.OldestPacketWaitTime(), TimeDelta::Millis(200));
784 EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime);
785
786 expected_queue_size = kPacingRate * TimeDelta::Millis(800);
787 expected_queue_time = expected_queue_size / kPacingRate;
788 EXPECT_NEAR(pacer.QueueSizeData().bytes(), expected_queue_size.bytes(),
789 kAllowedQueueSizeDeviation.bytes());
790 EXPECT_NEAR(pacer.ExpectedQueueTime().ms(), expected_queue_time.ms(),
791 kAllowedQueueTimeDeviation.ms());
792
793 // Advance to 500ms.
794 time_controller.AdvanceTime(TimeDelta::Millis(300));
795 EXPECT_EQ(pacer.OldestPacketWaitTime(), TimeDelta::Millis(500));
796 EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime);
797
798 expected_queue_size = kPacingRate * TimeDelta::Millis(500);
799 expected_queue_time = expected_queue_size / kPacingRate;
800 EXPECT_NEAR(pacer.QueueSizeData().bytes(), expected_queue_size.bytes(),
801 kAllowedQueueSizeDeviation.bytes());
802 EXPECT_NEAR(pacer.ExpectedQueueTime().ms(), expected_queue_time.ms(),
803 kAllowedQueueTimeDeviation.ms());
804
805 // Advance to 1000ms+, expect all packets to be sent.
806 time_controller.AdvanceTime(TimeDelta::Millis(500) +
807 kAllowedQueueTimeDeviation);
808 EXPECT_TRUE(pacer.OldestPacketWaitTime().IsZero());
809 EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime);
810 EXPECT_TRUE(pacer.QueueSizeData().IsZero());
811 EXPECT_TRUE(pacer.ExpectedQueueTime().IsZero());
812 }
813
814 // TODO(webrtc:14502): Rewrite these tests if the functionality is needed if
815 // pacing is done on the worker thread.
TEST(TaskQueuePacedSenderTest,HighPrecisionPacingWhenSlackIsDisabled)816 TEST(TaskQueuePacedSenderTest, HighPrecisionPacingWhenSlackIsDisabled) {
817 ScopedKeyValueConfig trials("WebRTC-SlackedTaskQueuePacedSender/Disabled/");
818
819 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
820 TaskQueueWithFakePrecisionFactory task_queue_factory(
821 time_controller.GetTaskQueueFactory());
822
823 MockPacketRouter packet_router;
824 TaskQueuePacedSender pacer(
825 time_controller.GetClock(), &packet_router, trials, &task_queue_factory,
826 PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
827
828 // Send enough packets (covering one second) that pacing is triggered, i.e.
829 // delayed tasks being scheduled.
830 static constexpr size_t kPacketsToSend = 42;
831 static constexpr DataRate kPacingRate =
832 DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend);
833 pacer.SetPacingRates(kPacingRate, DataRate::Zero());
834 pacer.EnsureStarted();
835 pacer.EnqueuePackets(
836 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
837 // Expect all of them to be sent.
838 size_t packets_sent = 0;
839 EXPECT_CALL(packet_router, SendPacket)
840 .WillRepeatedly(
841 [&](std::unique_ptr<RtpPacketToSend> packet,
842 const PacedPacketInfo& cluster_info) { ++packets_sent; });
843 time_controller.AdvanceTime(TimeDelta::Seconds(1));
844 EXPECT_EQ(packets_sent, kPacketsToSend);
845
846 // Expect pacing to make use of high precision.
847 EXPECT_EQ(task_queue_factory.delayed_low_precision_count(), 0);
848 EXPECT_GT(task_queue_factory.delayed_high_precision_count(), 0);
849
850 // Create probe cluster which is also high precision.
851 pacer.CreateProbeClusters(
852 {{.at_time = time_controller.GetClock()->CurrentTime(),
853 .target_data_rate = kPacingRate,
854 .target_duration = TimeDelta::Millis(15),
855 .target_probe_count = 4,
856 .id = 123}});
857 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
858 time_controller.AdvanceTime(TimeDelta::Seconds(1));
859 EXPECT_EQ(task_queue_factory.delayed_low_precision_count(), 0);
860 EXPECT_GT(task_queue_factory.delayed_high_precision_count(), 0);
861 }
862
863 // TODO(webrtc:14502): Rewrite these tests if the functionality is needed if
864 // pacing is done on the worker thread.
TEST(TaskQueuePacedSenderTest,LowPrecisionPacingWhenSlackIsEnabled)865 TEST(TaskQueuePacedSenderTest, LowPrecisionPacingWhenSlackIsEnabled) {
866 ScopedKeyValueConfig trials("WebRTC-SlackedTaskQueuePacedSender/Enabled/");
867
868 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
869 TaskQueueWithFakePrecisionFactory task_queue_factory(
870 time_controller.GetTaskQueueFactory());
871
872 MockPacketRouter packet_router;
873 TaskQueuePacedSender pacer(
874 time_controller.GetClock(), &packet_router, trials, &task_queue_factory,
875 PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
876
877 // Send enough packets (covering one second) that pacing is triggered, i.e.
878 // delayed tasks being scheduled.
879 static constexpr size_t kPacketsToSend = 42;
880 static constexpr DataRate kPacingRate =
881 DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend);
882 pacer.SetPacingRates(kPacingRate, DataRate::Zero());
883 pacer.EnsureStarted();
884 pacer.EnqueuePackets(
885 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
886 // Expect all of them to be sent.
887 size_t packets_sent = 0;
888 EXPECT_CALL(packet_router, SendPacket)
889 .WillRepeatedly(
890 [&](std::unique_ptr<RtpPacketToSend> packet,
891 const PacedPacketInfo& cluster_info) { ++packets_sent; });
892 time_controller.AdvanceTime(TimeDelta::Seconds(1));
893 EXPECT_EQ(packets_sent, kPacketsToSend);
894
895 // Expect pacing to make use of low precision.
896 EXPECT_GT(task_queue_factory.delayed_low_precision_count(), 0);
897 EXPECT_EQ(task_queue_factory.delayed_high_precision_count(), 0);
898
899 // Create probe cluster, which uses high precision despite regular pacing
900 // being low precision.
901 pacer.CreateProbeClusters(
902 {{.at_time = time_controller.GetClock()->CurrentTime(),
903 .target_data_rate = kPacingRate,
904 .target_duration = TimeDelta::Millis(15),
905 .target_probe_count = 4,
906 .id = 123}});
907 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
908 time_controller.AdvanceTime(TimeDelta::Seconds(1));
909 EXPECT_GT(task_queue_factory.delayed_high_precision_count(), 0);
910 }
911
912 } // namespace test
913 } // namespace webrtc
914