1 /*
2 * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10 #include "net/dcsctp/tx/stream_scheduler.h"
11
12 #include <vector>
13
14 #include "net/dcsctp/packet/sctp_packet.h"
15 #include "net/dcsctp/public/types.h"
16 #include "test/gmock.h"
17
18 namespace dcsctp {
19 namespace {
20 using ::testing::Return;
21 using ::testing::StrictMock;
22
23 constexpr size_t kMtu = 1000;
24 constexpr size_t kPayloadSize = 4;
25
26 MATCHER_P(HasDataWithMid, mid, "") {
27 if (!arg.has_value()) {
28 *result_listener << "There was no produced data";
29 return false;
30 }
31
32 if (arg->data.message_id != mid) {
33 *result_listener << "the produced data had mid " << *arg->data.message_id
34 << " and not the expected " << *mid;
35 return false;
36 }
37
38 return true;
39 }
40
41 std::function<absl::optional<SendQueue::DataToSend>(TimeMs, size_t)>
CreateChunk(StreamID sid,MID mid,size_t payload_size=kPayloadSize)42 CreateChunk(StreamID sid, MID mid, size_t payload_size = kPayloadSize) {
43 return [sid, mid, payload_size](TimeMs now, size_t max_size) {
44 return SendQueue::DataToSend(Data(
45 sid, SSN(0), mid, FSN(0), PPID(42), std::vector<uint8_t>(payload_size),
46 Data::IsBeginning(true), Data::IsEnd(true), IsUnordered(true)));
47 };
48 }
49
GetPacketCounts(StreamScheduler & scheduler,size_t packets_to_generate)50 std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
51 size_t packets_to_generate) {
52 std::map<StreamID, size_t> packet_counts;
53 for (size_t i = 0; i < packets_to_generate; ++i) {
54 absl::optional<SendQueue::DataToSend> data =
55 scheduler.Produce(TimeMs(0), kMtu);
56 if (data.has_value()) {
57 ++packet_counts[data->data.stream_id];
58 }
59 }
60 return packet_counts;
61 }
62
63 class MockStreamProducer : public StreamScheduler::StreamProducer {
64 public:
65 MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
66 Produce,
67 (TimeMs, size_t),
68 (override));
69 MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override));
70 };
71
72 class TestStream {
73 public:
TestStream(StreamScheduler & scheduler,StreamID stream_id,StreamPriority priority,size_t packet_size=kPayloadSize)74 TestStream(StreamScheduler& scheduler,
75 StreamID stream_id,
76 StreamPriority priority,
77 size_t packet_size = kPayloadSize) {
78 EXPECT_CALL(producer_, Produce)
79 .WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
80 EXPECT_CALL(producer_, bytes_to_send_in_next_message)
81 .WillRepeatedly(Return(packet_size));
82 stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
83 stream_->MaybeMakeActive();
84 }
85
stream()86 StreamScheduler::Stream& stream() { return *stream_; }
87
88 private:
89 StrictMock<MockStreamProducer> producer_;
90 std::unique_ptr<StreamScheduler::Stream> stream_;
91 };
92
93 // A scheduler without active streams doesn't produce data.
TEST(StreamSchedulerTest,HasNoActiveStreams)94 TEST(StreamSchedulerTest, HasNoActiveStreams) {
95 StreamScheduler scheduler(kMtu);
96
97 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
98 }
99
100 // Stream properties can be set and retrieved
TEST(StreamSchedulerTest,CanSetAndGetStreamProperties)101 TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
102 StreamScheduler scheduler(kMtu);
103
104 StrictMock<MockStreamProducer> producer;
105 auto stream =
106 scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
107
108 EXPECT_EQ(stream->stream_id(), StreamID(1));
109 EXPECT_EQ(stream->priority(), StreamPriority(2));
110
111 stream->SetPriority(StreamPriority(0));
112 EXPECT_EQ(stream->priority(), StreamPriority(0));
113 }
114
115 // A scheduler with a single stream produced packets from it.
TEST(StreamSchedulerTest,CanProduceFromSingleStream)116 TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
117 StreamScheduler scheduler(kMtu);
118
119 StrictMock<MockStreamProducer> producer;
120 EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
121 EXPECT_CALL(producer, bytes_to_send_in_next_message)
122 .WillOnce(Return(kPayloadSize)) // When making active
123 .WillOnce(Return(0));
124 auto stream =
125 scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
126 stream->MaybeMakeActive();
127
128 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
129 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
130 }
131
132 // Switches between two streams after every packet.
TEST(StreamSchedulerTest,WillRoundRobinBetweenStreams)133 TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
134 StreamScheduler scheduler(kMtu);
135
136 StrictMock<MockStreamProducer> producer1;
137 EXPECT_CALL(producer1, Produce)
138 .WillOnce(CreateChunk(StreamID(1), MID(100)))
139 .WillOnce(CreateChunk(StreamID(1), MID(101)))
140 .WillOnce(CreateChunk(StreamID(1), MID(102)));
141 EXPECT_CALL(producer1, bytes_to_send_in_next_message)
142 .WillOnce(Return(kPayloadSize)) // When making active
143 .WillOnce(Return(kPayloadSize))
144 .WillOnce(Return(kPayloadSize))
145 .WillOnce(Return(0));
146 auto stream1 =
147 scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
148 stream1->MaybeMakeActive();
149
150 StrictMock<MockStreamProducer> producer2;
151 EXPECT_CALL(producer2, Produce)
152 .WillOnce(CreateChunk(StreamID(2), MID(200)))
153 .WillOnce(CreateChunk(StreamID(2), MID(201)))
154 .WillOnce(CreateChunk(StreamID(2), MID(202)));
155 EXPECT_CALL(producer2, bytes_to_send_in_next_message)
156 .WillOnce(Return(kPayloadSize)) // When making active
157 .WillOnce(Return(kPayloadSize))
158 .WillOnce(Return(kPayloadSize))
159 .WillOnce(Return(0));
160 auto stream2 =
161 scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
162 stream2->MaybeMakeActive();
163
164 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
165 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
166 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
167 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
168 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
169 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
170 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
171 }
172
173 // Switches between two streams after every packet, but keeps producing from the
174 // same stream when a packet contains of multiple fragments.
TEST(StreamSchedulerTest,WillRoundRobinOnlyWhenFinishedProducingChunk)175 TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
176 StreamScheduler scheduler(kMtu);
177
178 StrictMock<MockStreamProducer> producer1;
179 EXPECT_CALL(producer1, Produce)
180 .WillOnce(CreateChunk(StreamID(1), MID(100)))
181 .WillOnce([](...) {
182 return SendQueue::DataToSend(
183 Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
184 std::vector<uint8_t>(4), Data::IsBeginning(true),
185 Data::IsEnd(false), IsUnordered(true)));
186 })
187 .WillOnce([](...) {
188 return SendQueue::DataToSend(
189 Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
190 std::vector<uint8_t>(4), Data::IsBeginning(false),
191 Data::IsEnd(false), IsUnordered(true)));
192 })
193 .WillOnce([](...) {
194 return SendQueue::DataToSend(
195 Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
196 std::vector<uint8_t>(4), Data::IsBeginning(false),
197 Data::IsEnd(true), IsUnordered(true)));
198 })
199 .WillOnce(CreateChunk(StreamID(1), MID(102)));
200 EXPECT_CALL(producer1, bytes_to_send_in_next_message)
201 .WillOnce(Return(kPayloadSize)) // When making active
202 .WillOnce(Return(kPayloadSize))
203 .WillOnce(Return(kPayloadSize))
204 .WillOnce(Return(kPayloadSize))
205 .WillOnce(Return(kPayloadSize))
206 .WillOnce(Return(0));
207 auto stream1 =
208 scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
209 stream1->MaybeMakeActive();
210
211 StrictMock<MockStreamProducer> producer2;
212 EXPECT_CALL(producer2, Produce)
213 .WillOnce(CreateChunk(StreamID(2), MID(200)))
214 .WillOnce(CreateChunk(StreamID(2), MID(201)))
215 .WillOnce(CreateChunk(StreamID(2), MID(202)));
216 EXPECT_CALL(producer2, bytes_to_send_in_next_message)
217 .WillOnce(Return(kPayloadSize)) // When making active
218 .WillOnce(Return(kPayloadSize))
219 .WillOnce(Return(kPayloadSize))
220 .WillOnce(Return(0));
221 auto stream2 =
222 scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
223 stream2->MaybeMakeActive();
224
225 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
226 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
227 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
228 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
229 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
230 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
231 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
232 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
233 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
234 }
235
236 // Deactivates a stream before it has finished producing all packets.
TEST(StreamSchedulerTest,StreamsCanBeMadeInactive)237 TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
238 StreamScheduler scheduler(kMtu);
239
240 StrictMock<MockStreamProducer> producer1;
241 EXPECT_CALL(producer1, Produce)
242 .WillOnce(CreateChunk(StreamID(1), MID(100)))
243 .WillOnce(CreateChunk(StreamID(1), MID(101)));
244 EXPECT_CALL(producer1, bytes_to_send_in_next_message)
245 .WillOnce(Return(kPayloadSize)) // When making active
246 .WillOnce(Return(kPayloadSize))
247 .WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
248 auto stream1 =
249 scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
250 stream1->MaybeMakeActive();
251
252 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
253 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
254
255 // ... but the stream is made inactive before it can be produced.
256 stream1->MakeInactive();
257 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
258 }
259
260 // Resumes a paused stream - makes a stream active after inactivating it.
TEST(StreamSchedulerTest,SingleStreamCanBeResumed)261 TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
262 StreamScheduler scheduler(kMtu);
263
264 StrictMock<MockStreamProducer> producer1;
265 // Callbacks are setup so that they hint that there is a MID(2) coming...
266 EXPECT_CALL(producer1, Produce)
267 .WillOnce(CreateChunk(StreamID(1), MID(100)))
268 .WillOnce(CreateChunk(StreamID(1), MID(101)))
269 .WillOnce(CreateChunk(StreamID(1), MID(102)));
270 EXPECT_CALL(producer1, bytes_to_send_in_next_message)
271 .WillOnce(Return(kPayloadSize)) // When making active
272 .WillOnce(Return(kPayloadSize))
273 .WillOnce(Return(kPayloadSize))
274 .WillOnce(Return(kPayloadSize)) // When making active again
275 .WillOnce(Return(0));
276 auto stream1 =
277 scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
278 stream1->MaybeMakeActive();
279
280 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
281 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
282
283 stream1->MakeInactive();
284 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
285 stream1->MaybeMakeActive();
286 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
287 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
288 }
289
290 // Iterates between streams, where one is suddenly paused and later resumed.
TEST(StreamSchedulerTest,WillRoundRobinWithPausedStream)291 TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
292 StreamScheduler scheduler(kMtu);
293
294 StrictMock<MockStreamProducer> producer1;
295 EXPECT_CALL(producer1, Produce)
296 .WillOnce(CreateChunk(StreamID(1), MID(100)))
297 .WillOnce(CreateChunk(StreamID(1), MID(101)))
298 .WillOnce(CreateChunk(StreamID(1), MID(102)));
299 EXPECT_CALL(producer1, bytes_to_send_in_next_message)
300 .WillOnce(Return(kPayloadSize)) // When making active
301 .WillOnce(Return(kPayloadSize))
302 .WillOnce(Return(kPayloadSize)) // When making active
303 .WillOnce(Return(kPayloadSize))
304 .WillOnce(Return(0));
305 auto stream1 =
306 scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
307 stream1->MaybeMakeActive();
308
309 StrictMock<MockStreamProducer> producer2;
310 EXPECT_CALL(producer2, Produce)
311 .WillOnce(CreateChunk(StreamID(2), MID(200)))
312 .WillOnce(CreateChunk(StreamID(2), MID(201)))
313 .WillOnce(CreateChunk(StreamID(2), MID(202)));
314 EXPECT_CALL(producer2, bytes_to_send_in_next_message)
315 .WillOnce(Return(kPayloadSize)) // When making active
316 .WillOnce(Return(kPayloadSize))
317 .WillOnce(Return(kPayloadSize))
318 .WillOnce(Return(0));
319 auto stream2 =
320 scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
321 stream2->MaybeMakeActive();
322
323 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
324 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
325 stream1->MakeInactive();
326 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
327 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
328 stream1->MaybeMakeActive();
329 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
330 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
331 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
332 }
333
334 // Verifies that packet counts are evenly distributed in round robin scheduling.
TEST(StreamSchedulerTest,WillDistributeRoundRobinPacketsEvenlyTwoStreams)335 TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) {
336 StreamScheduler scheduler(kMtu);
337 TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
338 TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
339
340 std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
341 EXPECT_EQ(packet_counts[StreamID(1)], 5U);
342 EXPECT_EQ(packet_counts[StreamID(2)], 5U);
343 }
344
345 // Verifies that packet counts are evenly distributed among active streams,
346 // where a stream is suddenly made inactive, two are added, and then the paused
347 // stream is resumed.
TEST(StreamSchedulerTest,WillDistributeEvenlyWithPausedAndAddedStreams)348 TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) {
349 StreamScheduler scheduler(kMtu);
350 TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
351 TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
352
353 std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
354 EXPECT_EQ(packet_counts[StreamID(1)], 5U);
355 EXPECT_EQ(packet_counts[StreamID(2)], 5U);
356
357 stream2.stream().MakeInactive();
358
359 TestStream stream3(scheduler, StreamID(3), StreamPriority(1));
360 TestStream stream4(scheduler, StreamID(4), StreamPriority(1));
361
362 std::map<StreamID, size_t> counts2 = GetPacketCounts(scheduler, 15);
363 EXPECT_EQ(counts2[StreamID(1)], 5U);
364 EXPECT_EQ(counts2[StreamID(2)], 0U);
365 EXPECT_EQ(counts2[StreamID(3)], 5U);
366 EXPECT_EQ(counts2[StreamID(4)], 5U);
367
368 stream2.stream().MaybeMakeActive();
369
370 std::map<StreamID, size_t> counts3 = GetPacketCounts(scheduler, 20);
371 EXPECT_EQ(counts3[StreamID(1)], 5U);
372 EXPECT_EQ(counts3[StreamID(2)], 5U);
373 EXPECT_EQ(counts3[StreamID(3)], 5U);
374 EXPECT_EQ(counts3[StreamID(4)], 5U);
375 }
376
377 // Degrades to fair queuing with streams having identical priority.
TEST(StreamSchedulerTest,WillDoFairQueuingWithSamePriority)378 TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) {
379 StreamScheduler scheduler(kMtu);
380 scheduler.EnableMessageInterleaving(true);
381
382 constexpr size_t kSmallPacket = 30;
383 constexpr size_t kLargePacket = 70;
384
385 StrictMock<MockStreamProducer> callback1;
386 EXPECT_CALL(callback1, Produce)
387 .WillOnce(CreateChunk(StreamID(1), MID(100), kSmallPacket))
388 .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket))
389 .WillOnce(CreateChunk(StreamID(1), MID(102), kSmallPacket));
390 EXPECT_CALL(callback1, bytes_to_send_in_next_message)
391 .WillOnce(Return(kSmallPacket)) // When making active
392 .WillOnce(Return(kSmallPacket))
393 .WillOnce(Return(kSmallPacket))
394 .WillOnce(Return(0));
395 auto stream1 =
396 scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
397 stream1->MaybeMakeActive();
398
399 StrictMock<MockStreamProducer> callback2;
400 EXPECT_CALL(callback2, Produce)
401 .WillOnce(CreateChunk(StreamID(2), MID(200), kLargePacket))
402 .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket))
403 .WillOnce(CreateChunk(StreamID(2), MID(202), kLargePacket));
404 EXPECT_CALL(callback2, bytes_to_send_in_next_message)
405 .WillOnce(Return(kLargePacket)) // When making active
406 .WillOnce(Return(kLargePacket))
407 .WillOnce(Return(kLargePacket))
408 .WillOnce(Return(0));
409 auto stream2 =
410 scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
411 stream2->MaybeMakeActive();
412
413 // t = 30
414 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
415 // t = 60
416 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
417 // t = 70
418 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
419 // t = 90
420 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
421 // t = 140
422 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
423 // t = 210
424 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
425 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
426 }
427
428 // Will do weighted fair queuing with three streams having different priority.
TEST(StreamSchedulerTest,WillDoWeightedFairQueuingSameSizeDifferentPriority)429 TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) {
430 StreamScheduler scheduler(kMtu);
431 scheduler.EnableMessageInterleaving(true);
432
433 StrictMock<MockStreamProducer> callback1;
434 EXPECT_CALL(callback1, Produce)
435 .WillOnce(CreateChunk(StreamID(1), MID(100)))
436 .WillOnce(CreateChunk(StreamID(1), MID(101)))
437 .WillOnce(CreateChunk(StreamID(1), MID(102)));
438 EXPECT_CALL(callback1, bytes_to_send_in_next_message)
439 .WillOnce(Return(kPayloadSize)) // When making active
440 .WillOnce(Return(kPayloadSize))
441 .WillOnce(Return(kPayloadSize))
442 .WillOnce(Return(0));
443 // Priority 125 -> allowed to produce every 1000/125 ~= 80 time units.
444 auto stream1 =
445 scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
446 stream1->MaybeMakeActive();
447
448 StrictMock<MockStreamProducer> callback2;
449 EXPECT_CALL(callback2, Produce)
450 .WillOnce(CreateChunk(StreamID(2), MID(200)))
451 .WillOnce(CreateChunk(StreamID(2), MID(201)))
452 .WillOnce(CreateChunk(StreamID(2), MID(202)));
453 EXPECT_CALL(callback2, bytes_to_send_in_next_message)
454 .WillOnce(Return(kPayloadSize)) // When making active
455 .WillOnce(Return(kPayloadSize))
456 .WillOnce(Return(kPayloadSize))
457 .WillOnce(Return(0));
458 // Priority 200 -> allowed to produce every 1000/200 ~= 50 time units.
459 auto stream2 =
460 scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
461 stream2->MaybeMakeActive();
462
463 StrictMock<MockStreamProducer> callback3;
464 EXPECT_CALL(callback3, Produce)
465 .WillOnce(CreateChunk(StreamID(3), MID(300)))
466 .WillOnce(CreateChunk(StreamID(3), MID(301)))
467 .WillOnce(CreateChunk(StreamID(3), MID(302)));
468 EXPECT_CALL(callback3, bytes_to_send_in_next_message)
469 .WillOnce(Return(kPayloadSize)) // When making active
470 .WillOnce(Return(kPayloadSize))
471 .WillOnce(Return(kPayloadSize))
472 .WillOnce(Return(0));
473 // Priority 500 -> allowed to produce every 1000/500 ~= 20 time units.
474 auto stream3 =
475 scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
476 stream3->MaybeMakeActive();
477
478 // t ~= 20
479 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300)));
480 // t ~= 40
481 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301)));
482 // t ~= 50
483 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
484 // t ~= 60
485 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302)));
486 // t ~= 80
487 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
488 // t ~= 100
489 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
490 // t ~= 150
491 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
492 // t ~= 160
493 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
494 // t ~= 240
495 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
496 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
497 }
498
499 // Will do weighted fair queuing with three streams having different priority
500 // and sending different payload sizes.
TEST(StreamSchedulerTest,WillDoWeightedFairQueuingDifferentSizeAndPriority)501 TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) {
502 StreamScheduler scheduler(kMtu);
503 scheduler.EnableMessageInterleaving(true);
504
505 constexpr size_t kSmallPacket = 20;
506 constexpr size_t kMediumPacket = 50;
507 constexpr size_t kLargePacket = 70;
508
509 // Stream with priority = 125 -> inverse weight ~=80
510 StrictMock<MockStreamProducer> callback1;
511 EXPECT_CALL(callback1, Produce)
512 // virtual finish time ~ 0 + 50 * 80 = 4000
513 .WillOnce(CreateChunk(StreamID(1), MID(100), kMediumPacket))
514 // virtual finish time ~ 4000 + 20 * 80 = 5600
515 .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket))
516 // virtual finish time ~ 5600 + 70 * 80 = 11200
517 .WillOnce(CreateChunk(StreamID(1), MID(102), kLargePacket));
518 EXPECT_CALL(callback1, bytes_to_send_in_next_message)
519 .WillOnce(Return(kMediumPacket)) // When making active
520 .WillOnce(Return(kSmallPacket))
521 .WillOnce(Return(kLargePacket))
522 .WillOnce(Return(0));
523 auto stream1 =
524 scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
525 stream1->MaybeMakeActive();
526
527 // Stream with priority = 200 -> inverse weight ~=50
528 StrictMock<MockStreamProducer> callback2;
529 EXPECT_CALL(callback2, Produce)
530 // virtual finish time ~ 0 + 50 * 50 = 2500
531 .WillOnce(CreateChunk(StreamID(2), MID(200), kMediumPacket))
532 // virtual finish time ~ 2500 + 70 * 50 = 6000
533 .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket))
534 // virtual finish time ~ 6000 + 20 * 50 = 7000
535 .WillOnce(CreateChunk(StreamID(2), MID(202), kSmallPacket));
536 EXPECT_CALL(callback2, bytes_to_send_in_next_message)
537 .WillOnce(Return(kMediumPacket)) // When making active
538 .WillOnce(Return(kLargePacket))
539 .WillOnce(Return(kSmallPacket))
540 .WillOnce(Return(0));
541 auto stream2 =
542 scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
543 stream2->MaybeMakeActive();
544
545 // Stream with priority = 500 -> inverse weight ~=20
546 StrictMock<MockStreamProducer> callback3;
547 EXPECT_CALL(callback3, Produce)
548 // virtual finish time ~ 0 + 20 * 20 = 400
549 .WillOnce(CreateChunk(StreamID(3), MID(300), kSmallPacket))
550 // virtual finish time ~ 400 + 50 * 20 = 1400
551 .WillOnce(CreateChunk(StreamID(3), MID(301), kMediumPacket))
552 // virtual finish time ~ 1400 + 70 * 20 = 2800
553 .WillOnce(CreateChunk(StreamID(3), MID(302), kLargePacket));
554 EXPECT_CALL(callback3, bytes_to_send_in_next_message)
555 .WillOnce(Return(kSmallPacket)) // When making active
556 .WillOnce(Return(kMediumPacket))
557 .WillOnce(Return(kLargePacket))
558 .WillOnce(Return(0));
559 auto stream3 =
560 scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
561 stream3->MaybeMakeActive();
562
563 // t ~= 400
564 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300)));
565 // t ~= 1400
566 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301)));
567 // t ~= 2500
568 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
569 // t ~= 2800
570 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302)));
571 // t ~= 4000
572 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
573 // t ~= 5600
574 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
575 // t ~= 6000
576 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
577 // t ~= 7000
578 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
579 // t ~= 11200
580 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
581 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
582 }
TEST(StreamSchedulerTest,WillDistributeWFQPacketsInTwoStreamsByPriority)583 TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) {
584 // A simple test with two streams of different priority, but sending packets
585 // of identical size. Verifies that the ratio of sent packets represent their
586 // priority.
587 StreamScheduler scheduler(kMtu);
588 scheduler.EnableMessageInterleaving(true);
589
590 TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
591 TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
592
593 std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 15);
594 EXPECT_EQ(packet_counts[StreamID(1)], 5U);
595 EXPECT_EQ(packet_counts[StreamID(2)], 10U);
596 }
597
TEST(StreamSchedulerTest,WillDistributeWFQPacketsInFourStreamsByPriority)598 TEST(StreamSchedulerTest, WillDistributeWFQPacketsInFourStreamsByPriority) {
599 // Same as `WillDistributeWFQPacketsInTwoStreamsByPriority` but with more
600 // streams.
601 StreamScheduler scheduler(kMtu);
602 scheduler.EnableMessageInterleaving(true);
603
604 TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
605 TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
606 TestStream stream3(scheduler, StreamID(3), StreamPriority(300), kPayloadSize);
607 TestStream stream4(scheduler, StreamID(4), StreamPriority(400), kPayloadSize);
608
609 std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 50);
610 EXPECT_EQ(packet_counts[StreamID(1)], 5U);
611 EXPECT_EQ(packet_counts[StreamID(2)], 10U);
612 EXPECT_EQ(packet_counts[StreamID(3)], 15U);
613 EXPECT_EQ(packet_counts[StreamID(4)], 20U);
614 }
615
TEST(StreamSchedulerTest,WillDistributeFromTwoStreamsFairly)616 TEST(StreamSchedulerTest, WillDistributeFromTwoStreamsFairly) {
617 // A simple test with two streams of different priority, but sending packets
618 // of different size. Verifies that the ratio of total packet payload
619 // represent their priority.
620 // In this example,
621 // * stream1 has priority 100 and sends packets of size 8
622 // * stream2 has priority 400 and sends packets of size 4
623 // With round robin, stream1 would get twice as many payload bytes on the wire
624 // as stream2, but with WFQ and a 4x priority increase, stream2 should 4x as
625 // many payload bytes on the wire. That translates to stream2 getting 8x as
626 // many packets on the wire as they are half as large.
627 StreamScheduler scheduler(kMtu);
628 // Enable WFQ scheduler.
629 scheduler.EnableMessageInterleaving(true);
630
631 TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
632 /*packet_size=*/8);
633 TestStream stream2(scheduler, StreamID(2), StreamPriority(400),
634 /*packet_size=*/4);
635
636 std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 90);
637 EXPECT_EQ(packet_counts[StreamID(1)], 10U);
638 EXPECT_EQ(packet_counts[StreamID(2)], 80U);
639 }
640
TEST(StreamSchedulerTest,WillDistributeFromFourStreamsFairly)641 TEST(StreamSchedulerTest, WillDistributeFromFourStreamsFairly) {
642 // Same as `WillDistributeWeightedFairFromTwoStreamsFairly` but more
643 // complicated.
644 StreamScheduler scheduler(kMtu);
645 // Enable WFQ scheduler.
646 scheduler.EnableMessageInterleaving(true);
647
648 TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
649 /*packet_size=*/10);
650 TestStream stream2(scheduler, StreamID(2), StreamPriority(200),
651 /*packet_size=*/10);
652 TestStream stream3(scheduler, StreamID(3), StreamPriority(200),
653 /*packet_size=*/20);
654 TestStream stream4(scheduler, StreamID(4), StreamPriority(400),
655 /*packet_size=*/30);
656
657 std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 80);
658 // 15 packets * 10 bytes = 150 bytes at priority 100.
659 EXPECT_EQ(packet_counts[StreamID(1)], 15U);
660 // 30 packets * 10 bytes = 300 bytes at priority 200.
661 EXPECT_EQ(packet_counts[StreamID(2)], 30U);
662 // 15 packets * 20 bytes = 300 bytes at priority 200.
663 EXPECT_EQ(packet_counts[StreamID(3)], 15U);
664 // 20 packets * 30 bytes = 600 bytes at priority 400.
665 EXPECT_EQ(packet_counts[StreamID(4)], 20U);
666 }
667
668 // Sending large messages with small MTU will fragment the messages and produce
669 // a first fragment not larger than the MTU, and will then not first send from
670 // the stream with the smallest message, as their first fragment will be equally
671 // small for both streams. See `LargeMessageWithLargeMtu` for the same test, but
672 // with a larger MTU.
TEST(StreamSchedulerTest,SendLargeMessageWithSmallMtu)673 TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) {
674 StreamScheduler scheduler(100 + SctpPacket::kHeaderSize +
675 IDataChunk::kHeaderSize);
676 scheduler.EnableMessageInterleaving(true);
677
678 StrictMock<MockStreamProducer> producer1;
679 EXPECT_CALL(producer1, Produce)
680 .WillOnce(CreateChunk(StreamID(1), MID(0), 100))
681 .WillOnce(CreateChunk(StreamID(1), MID(0), 100));
682 EXPECT_CALL(producer1, bytes_to_send_in_next_message)
683 .WillOnce(Return(200)) // When making active
684 .WillOnce(Return(100))
685 .WillOnce(Return(0));
686 auto stream1 =
687 scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
688 stream1->MaybeMakeActive();
689
690 StrictMock<MockStreamProducer> producer2;
691 EXPECT_CALL(producer2, Produce)
692 .WillOnce(CreateChunk(StreamID(2), MID(1), 100))
693 .WillOnce(CreateChunk(StreamID(2), MID(1), 50));
694 EXPECT_CALL(producer2, bytes_to_send_in_next_message)
695 .WillOnce(Return(150)) // When making active
696 .WillOnce(Return(50))
697 .WillOnce(Return(0));
698 auto stream2 =
699 scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
700 stream2->MaybeMakeActive();
701 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
702 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
703 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
704 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
705 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
706 }
707
708 // Sending large messages with large MTU will not fragment messages and will
709 // send the message first from the stream that has the smallest message.
TEST(StreamSchedulerTest,SendLargeMessageWithLargeMtu)710 TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) {
711 StreamScheduler scheduler(200 + SctpPacket::kHeaderSize +
712 IDataChunk::kHeaderSize);
713 scheduler.EnableMessageInterleaving(true);
714
715 StrictMock<MockStreamProducer> producer1;
716 EXPECT_CALL(producer1, Produce)
717 .WillOnce(CreateChunk(StreamID(1), MID(0), 200));
718 EXPECT_CALL(producer1, bytes_to_send_in_next_message)
719 .WillOnce(Return(200)) // When making active
720 .WillOnce(Return(0));
721 auto stream1 =
722 scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
723 stream1->MaybeMakeActive();
724
725 StrictMock<MockStreamProducer> producer2;
726 EXPECT_CALL(producer2, Produce)
727 .WillOnce(CreateChunk(StreamID(2), MID(1), 150));
728 EXPECT_CALL(producer2, bytes_to_send_in_next_message)
729 .WillOnce(Return(150)) // When making active
730 .WillOnce(Return(0));
731 auto stream2 =
732 scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
733 stream2->MaybeMakeActive();
734 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
735 EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
736 EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
737 }
738
739 } // namespace
740 } // namespace dcsctp
741