xref: /aosp_15_r20/external/webrtc/net/dcsctp/tx/stream_scheduler_test.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "net/dcsctp/tx/stream_scheduler.h"
11 
12 #include <vector>
13 
14 #include "net/dcsctp/packet/sctp_packet.h"
15 #include "net/dcsctp/public/types.h"
16 #include "test/gmock.h"
17 
18 namespace dcsctp {
19 namespace {
20 using ::testing::Return;
21 using ::testing::StrictMock;
22 
23 constexpr size_t kMtu = 1000;
24 constexpr size_t kPayloadSize = 4;
25 
26 MATCHER_P(HasDataWithMid, mid, "") {
27   if (!arg.has_value()) {
28     *result_listener << "There was no produced data";
29     return false;
30   }
31 
32   if (arg->data.message_id != mid) {
33     *result_listener << "the produced data had mid " << *arg->data.message_id
34                      << " and not the expected " << *mid;
35     return false;
36   }
37 
38   return true;
39 }
40 
41 std::function<absl::optional<SendQueue::DataToSend>(TimeMs, size_t)>
CreateChunk(StreamID sid,MID mid,size_t payload_size=kPayloadSize)42 CreateChunk(StreamID sid, MID mid, size_t payload_size = kPayloadSize) {
43   return [sid, mid, payload_size](TimeMs now, size_t max_size) {
44     return SendQueue::DataToSend(Data(
45         sid, SSN(0), mid, FSN(0), PPID(42), std::vector<uint8_t>(payload_size),
46         Data::IsBeginning(true), Data::IsEnd(true), IsUnordered(true)));
47   };
48 }
49 
GetPacketCounts(StreamScheduler & scheduler,size_t packets_to_generate)50 std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
51                                            size_t packets_to_generate) {
52   std::map<StreamID, size_t> packet_counts;
53   for (size_t i = 0; i < packets_to_generate; ++i) {
54     absl::optional<SendQueue::DataToSend> data =
55         scheduler.Produce(TimeMs(0), kMtu);
56     if (data.has_value()) {
57       ++packet_counts[data->data.stream_id];
58     }
59   }
60   return packet_counts;
61 }
62 
63 class MockStreamProducer : public StreamScheduler::StreamProducer {
64  public:
65   MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
66               Produce,
67               (TimeMs, size_t),
68               (override));
69   MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override));
70 };
71 
72 class TestStream {
73  public:
TestStream(StreamScheduler & scheduler,StreamID stream_id,StreamPriority priority,size_t packet_size=kPayloadSize)74   TestStream(StreamScheduler& scheduler,
75              StreamID stream_id,
76              StreamPriority priority,
77              size_t packet_size = kPayloadSize) {
78     EXPECT_CALL(producer_, Produce)
79         .WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
80     EXPECT_CALL(producer_, bytes_to_send_in_next_message)
81         .WillRepeatedly(Return(packet_size));
82     stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
83     stream_->MaybeMakeActive();
84   }
85 
stream()86   StreamScheduler::Stream& stream() { return *stream_; }
87 
88  private:
89   StrictMock<MockStreamProducer> producer_;
90   std::unique_ptr<StreamScheduler::Stream> stream_;
91 };
92 
93 // A scheduler without active streams doesn't produce data.
TEST(StreamSchedulerTest,HasNoActiveStreams)94 TEST(StreamSchedulerTest, HasNoActiveStreams) {
95   StreamScheduler scheduler(kMtu);
96 
97   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
98 }
99 
100 // Stream properties can be set and retrieved
TEST(StreamSchedulerTest,CanSetAndGetStreamProperties)101 TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
102   StreamScheduler scheduler(kMtu);
103 
104   StrictMock<MockStreamProducer> producer;
105   auto stream =
106       scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
107 
108   EXPECT_EQ(stream->stream_id(), StreamID(1));
109   EXPECT_EQ(stream->priority(), StreamPriority(2));
110 
111   stream->SetPriority(StreamPriority(0));
112   EXPECT_EQ(stream->priority(), StreamPriority(0));
113 }
114 
115 // A scheduler with a single stream produced packets from it.
TEST(StreamSchedulerTest,CanProduceFromSingleStream)116 TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
117   StreamScheduler scheduler(kMtu);
118 
119   StrictMock<MockStreamProducer> producer;
120   EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
121   EXPECT_CALL(producer, bytes_to_send_in_next_message)
122       .WillOnce(Return(kPayloadSize))  // When making active
123       .WillOnce(Return(0));
124   auto stream =
125       scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
126   stream->MaybeMakeActive();
127 
128   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
129   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
130 }
131 
132 // Switches between two streams after every packet.
TEST(StreamSchedulerTest,WillRoundRobinBetweenStreams)133 TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
134   StreamScheduler scheduler(kMtu);
135 
136   StrictMock<MockStreamProducer> producer1;
137   EXPECT_CALL(producer1, Produce)
138       .WillOnce(CreateChunk(StreamID(1), MID(100)))
139       .WillOnce(CreateChunk(StreamID(1), MID(101)))
140       .WillOnce(CreateChunk(StreamID(1), MID(102)));
141   EXPECT_CALL(producer1, bytes_to_send_in_next_message)
142       .WillOnce(Return(kPayloadSize))  // When making active
143       .WillOnce(Return(kPayloadSize))
144       .WillOnce(Return(kPayloadSize))
145       .WillOnce(Return(0));
146   auto stream1 =
147       scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
148   stream1->MaybeMakeActive();
149 
150   StrictMock<MockStreamProducer> producer2;
151   EXPECT_CALL(producer2, Produce)
152       .WillOnce(CreateChunk(StreamID(2), MID(200)))
153       .WillOnce(CreateChunk(StreamID(2), MID(201)))
154       .WillOnce(CreateChunk(StreamID(2), MID(202)));
155   EXPECT_CALL(producer2, bytes_to_send_in_next_message)
156       .WillOnce(Return(kPayloadSize))  // When making active
157       .WillOnce(Return(kPayloadSize))
158       .WillOnce(Return(kPayloadSize))
159       .WillOnce(Return(0));
160   auto stream2 =
161       scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
162   stream2->MaybeMakeActive();
163 
164   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
165   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
166   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
167   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
168   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
169   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
170   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
171 }
172 
173 // Switches between two streams after every packet, but keeps producing from the
174 // same stream when a packet contains of multiple fragments.
TEST(StreamSchedulerTest,WillRoundRobinOnlyWhenFinishedProducingChunk)175 TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
176   StreamScheduler scheduler(kMtu);
177 
178   StrictMock<MockStreamProducer> producer1;
179   EXPECT_CALL(producer1, Produce)
180       .WillOnce(CreateChunk(StreamID(1), MID(100)))
181       .WillOnce([](...) {
182         return SendQueue::DataToSend(
183             Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
184                  std::vector<uint8_t>(4), Data::IsBeginning(true),
185                  Data::IsEnd(false), IsUnordered(true)));
186       })
187       .WillOnce([](...) {
188         return SendQueue::DataToSend(
189             Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
190                  std::vector<uint8_t>(4), Data::IsBeginning(false),
191                  Data::IsEnd(false), IsUnordered(true)));
192       })
193       .WillOnce([](...) {
194         return SendQueue::DataToSend(
195             Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
196                  std::vector<uint8_t>(4), Data::IsBeginning(false),
197                  Data::IsEnd(true), IsUnordered(true)));
198       })
199       .WillOnce(CreateChunk(StreamID(1), MID(102)));
200   EXPECT_CALL(producer1, bytes_to_send_in_next_message)
201       .WillOnce(Return(kPayloadSize))  // When making active
202       .WillOnce(Return(kPayloadSize))
203       .WillOnce(Return(kPayloadSize))
204       .WillOnce(Return(kPayloadSize))
205       .WillOnce(Return(kPayloadSize))
206       .WillOnce(Return(0));
207   auto stream1 =
208       scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
209   stream1->MaybeMakeActive();
210 
211   StrictMock<MockStreamProducer> producer2;
212   EXPECT_CALL(producer2, Produce)
213       .WillOnce(CreateChunk(StreamID(2), MID(200)))
214       .WillOnce(CreateChunk(StreamID(2), MID(201)))
215       .WillOnce(CreateChunk(StreamID(2), MID(202)));
216   EXPECT_CALL(producer2, bytes_to_send_in_next_message)
217       .WillOnce(Return(kPayloadSize))  // When making active
218       .WillOnce(Return(kPayloadSize))
219       .WillOnce(Return(kPayloadSize))
220       .WillOnce(Return(0));
221   auto stream2 =
222       scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
223   stream2->MaybeMakeActive();
224 
225   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
226   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
227   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
228   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
229   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
230   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
231   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
232   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
233   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
234 }
235 
236 // Deactivates a stream before it has finished producing all packets.
TEST(StreamSchedulerTest,StreamsCanBeMadeInactive)237 TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
238   StreamScheduler scheduler(kMtu);
239 
240   StrictMock<MockStreamProducer> producer1;
241   EXPECT_CALL(producer1, Produce)
242       .WillOnce(CreateChunk(StreamID(1), MID(100)))
243       .WillOnce(CreateChunk(StreamID(1), MID(101)));
244   EXPECT_CALL(producer1, bytes_to_send_in_next_message)
245       .WillOnce(Return(kPayloadSize))  // When making active
246       .WillOnce(Return(kPayloadSize))
247       .WillOnce(Return(kPayloadSize));  // hints that there is a MID(2) coming.
248   auto stream1 =
249       scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
250   stream1->MaybeMakeActive();
251 
252   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
253   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
254 
255   // ... but the stream is made inactive before it can be produced.
256   stream1->MakeInactive();
257   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
258 }
259 
260 // Resumes a paused stream - makes a stream active after inactivating it.
TEST(StreamSchedulerTest,SingleStreamCanBeResumed)261 TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
262   StreamScheduler scheduler(kMtu);
263 
264   StrictMock<MockStreamProducer> producer1;
265   // Callbacks are setup so that they hint that there is a MID(2) coming...
266   EXPECT_CALL(producer1, Produce)
267       .WillOnce(CreateChunk(StreamID(1), MID(100)))
268       .WillOnce(CreateChunk(StreamID(1), MID(101)))
269       .WillOnce(CreateChunk(StreamID(1), MID(102)));
270   EXPECT_CALL(producer1, bytes_to_send_in_next_message)
271       .WillOnce(Return(kPayloadSize))  // When making active
272       .WillOnce(Return(kPayloadSize))
273       .WillOnce(Return(kPayloadSize))
274       .WillOnce(Return(kPayloadSize))  // When making active again
275       .WillOnce(Return(0));
276   auto stream1 =
277       scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
278   stream1->MaybeMakeActive();
279 
280   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
281   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
282 
283   stream1->MakeInactive();
284   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
285   stream1->MaybeMakeActive();
286   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
287   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
288 }
289 
290 // Iterates between streams, where one is suddenly paused and later resumed.
TEST(StreamSchedulerTest,WillRoundRobinWithPausedStream)291 TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
292   StreamScheduler scheduler(kMtu);
293 
294   StrictMock<MockStreamProducer> producer1;
295   EXPECT_CALL(producer1, Produce)
296       .WillOnce(CreateChunk(StreamID(1), MID(100)))
297       .WillOnce(CreateChunk(StreamID(1), MID(101)))
298       .WillOnce(CreateChunk(StreamID(1), MID(102)));
299   EXPECT_CALL(producer1, bytes_to_send_in_next_message)
300       .WillOnce(Return(kPayloadSize))  // When making active
301       .WillOnce(Return(kPayloadSize))
302       .WillOnce(Return(kPayloadSize))  // When making active
303       .WillOnce(Return(kPayloadSize))
304       .WillOnce(Return(0));
305   auto stream1 =
306       scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
307   stream1->MaybeMakeActive();
308 
309   StrictMock<MockStreamProducer> producer2;
310   EXPECT_CALL(producer2, Produce)
311       .WillOnce(CreateChunk(StreamID(2), MID(200)))
312       .WillOnce(CreateChunk(StreamID(2), MID(201)))
313       .WillOnce(CreateChunk(StreamID(2), MID(202)));
314   EXPECT_CALL(producer2, bytes_to_send_in_next_message)
315       .WillOnce(Return(kPayloadSize))  // When making active
316       .WillOnce(Return(kPayloadSize))
317       .WillOnce(Return(kPayloadSize))
318       .WillOnce(Return(0));
319   auto stream2 =
320       scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
321   stream2->MaybeMakeActive();
322 
323   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
324   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
325   stream1->MakeInactive();
326   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
327   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
328   stream1->MaybeMakeActive();
329   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
330   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
331   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
332 }
333 
334 // Verifies that packet counts are evenly distributed in round robin scheduling.
TEST(StreamSchedulerTest,WillDistributeRoundRobinPacketsEvenlyTwoStreams)335 TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) {
336   StreamScheduler scheduler(kMtu);
337   TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
338   TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
339 
340   std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
341   EXPECT_EQ(packet_counts[StreamID(1)], 5U);
342   EXPECT_EQ(packet_counts[StreamID(2)], 5U);
343 }
344 
345 // Verifies that packet counts are evenly distributed among active streams,
346 // where a stream is suddenly made inactive, two are added, and then the paused
347 // stream is resumed.
TEST(StreamSchedulerTest,WillDistributeEvenlyWithPausedAndAddedStreams)348 TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) {
349   StreamScheduler scheduler(kMtu);
350   TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
351   TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
352 
353   std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
354   EXPECT_EQ(packet_counts[StreamID(1)], 5U);
355   EXPECT_EQ(packet_counts[StreamID(2)], 5U);
356 
357   stream2.stream().MakeInactive();
358 
359   TestStream stream3(scheduler, StreamID(3), StreamPriority(1));
360   TestStream stream4(scheduler, StreamID(4), StreamPriority(1));
361 
362   std::map<StreamID, size_t> counts2 = GetPacketCounts(scheduler, 15);
363   EXPECT_EQ(counts2[StreamID(1)], 5U);
364   EXPECT_EQ(counts2[StreamID(2)], 0U);
365   EXPECT_EQ(counts2[StreamID(3)], 5U);
366   EXPECT_EQ(counts2[StreamID(4)], 5U);
367 
368   stream2.stream().MaybeMakeActive();
369 
370   std::map<StreamID, size_t> counts3 = GetPacketCounts(scheduler, 20);
371   EXPECT_EQ(counts3[StreamID(1)], 5U);
372   EXPECT_EQ(counts3[StreamID(2)], 5U);
373   EXPECT_EQ(counts3[StreamID(3)], 5U);
374   EXPECT_EQ(counts3[StreamID(4)], 5U);
375 }
376 
377 // Degrades to fair queuing with streams having identical priority.
TEST(StreamSchedulerTest,WillDoFairQueuingWithSamePriority)378 TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) {
379   StreamScheduler scheduler(kMtu);
380   scheduler.EnableMessageInterleaving(true);
381 
382   constexpr size_t kSmallPacket = 30;
383   constexpr size_t kLargePacket = 70;
384 
385   StrictMock<MockStreamProducer> callback1;
386   EXPECT_CALL(callback1, Produce)
387       .WillOnce(CreateChunk(StreamID(1), MID(100), kSmallPacket))
388       .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket))
389       .WillOnce(CreateChunk(StreamID(1), MID(102), kSmallPacket));
390   EXPECT_CALL(callback1, bytes_to_send_in_next_message)
391       .WillOnce(Return(kSmallPacket))  // When making active
392       .WillOnce(Return(kSmallPacket))
393       .WillOnce(Return(kSmallPacket))
394       .WillOnce(Return(0));
395   auto stream1 =
396       scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
397   stream1->MaybeMakeActive();
398 
399   StrictMock<MockStreamProducer> callback2;
400   EXPECT_CALL(callback2, Produce)
401       .WillOnce(CreateChunk(StreamID(2), MID(200), kLargePacket))
402       .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket))
403       .WillOnce(CreateChunk(StreamID(2), MID(202), kLargePacket));
404   EXPECT_CALL(callback2, bytes_to_send_in_next_message)
405       .WillOnce(Return(kLargePacket))  // When making active
406       .WillOnce(Return(kLargePacket))
407       .WillOnce(Return(kLargePacket))
408       .WillOnce(Return(0));
409   auto stream2 =
410       scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
411   stream2->MaybeMakeActive();
412 
413   // t = 30
414   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
415   // t = 60
416   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
417   // t = 70
418   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
419   // t = 90
420   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
421   // t = 140
422   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
423   // t = 210
424   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
425   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
426 }
427 
428 // Will do weighted fair queuing with three streams having different priority.
TEST(StreamSchedulerTest,WillDoWeightedFairQueuingSameSizeDifferentPriority)429 TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) {
430   StreamScheduler scheduler(kMtu);
431   scheduler.EnableMessageInterleaving(true);
432 
433   StrictMock<MockStreamProducer> callback1;
434   EXPECT_CALL(callback1, Produce)
435       .WillOnce(CreateChunk(StreamID(1), MID(100)))
436       .WillOnce(CreateChunk(StreamID(1), MID(101)))
437       .WillOnce(CreateChunk(StreamID(1), MID(102)));
438   EXPECT_CALL(callback1, bytes_to_send_in_next_message)
439       .WillOnce(Return(kPayloadSize))  // When making active
440       .WillOnce(Return(kPayloadSize))
441       .WillOnce(Return(kPayloadSize))
442       .WillOnce(Return(0));
443   // Priority 125 -> allowed to produce every 1000/125 ~= 80 time units.
444   auto stream1 =
445       scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
446   stream1->MaybeMakeActive();
447 
448   StrictMock<MockStreamProducer> callback2;
449   EXPECT_CALL(callback2, Produce)
450       .WillOnce(CreateChunk(StreamID(2), MID(200)))
451       .WillOnce(CreateChunk(StreamID(2), MID(201)))
452       .WillOnce(CreateChunk(StreamID(2), MID(202)));
453   EXPECT_CALL(callback2, bytes_to_send_in_next_message)
454       .WillOnce(Return(kPayloadSize))  // When making active
455       .WillOnce(Return(kPayloadSize))
456       .WillOnce(Return(kPayloadSize))
457       .WillOnce(Return(0));
458   // Priority 200 -> allowed to produce every 1000/200 ~= 50 time units.
459   auto stream2 =
460       scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
461   stream2->MaybeMakeActive();
462 
463   StrictMock<MockStreamProducer> callback3;
464   EXPECT_CALL(callback3, Produce)
465       .WillOnce(CreateChunk(StreamID(3), MID(300)))
466       .WillOnce(CreateChunk(StreamID(3), MID(301)))
467       .WillOnce(CreateChunk(StreamID(3), MID(302)));
468   EXPECT_CALL(callback3, bytes_to_send_in_next_message)
469       .WillOnce(Return(kPayloadSize))  // When making active
470       .WillOnce(Return(kPayloadSize))
471       .WillOnce(Return(kPayloadSize))
472       .WillOnce(Return(0));
473   // Priority 500 -> allowed to produce every 1000/500 ~= 20 time units.
474   auto stream3 =
475       scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
476   stream3->MaybeMakeActive();
477 
478   // t ~= 20
479   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300)));
480   // t ~= 40
481   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301)));
482   // t ~= 50
483   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
484   // t ~= 60
485   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302)));
486   // t ~= 80
487   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
488   // t ~= 100
489   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
490   // t ~= 150
491   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
492   // t ~= 160
493   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
494   // t ~= 240
495   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
496   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
497 }
498 
499 // Will do weighted fair queuing with three streams having different priority
500 // and sending different payload sizes.
TEST(StreamSchedulerTest,WillDoWeightedFairQueuingDifferentSizeAndPriority)501 TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) {
502   StreamScheduler scheduler(kMtu);
503   scheduler.EnableMessageInterleaving(true);
504 
505   constexpr size_t kSmallPacket = 20;
506   constexpr size_t kMediumPacket = 50;
507   constexpr size_t kLargePacket = 70;
508 
509   // Stream with priority = 125 -> inverse weight ~=80
510   StrictMock<MockStreamProducer> callback1;
511   EXPECT_CALL(callback1, Produce)
512       // virtual finish time ~ 0 + 50 * 80 = 4000
513       .WillOnce(CreateChunk(StreamID(1), MID(100), kMediumPacket))
514       // virtual finish time ~ 4000 + 20 * 80 = 5600
515       .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket))
516       // virtual finish time ~ 5600 + 70 * 80 = 11200
517       .WillOnce(CreateChunk(StreamID(1), MID(102), kLargePacket));
518   EXPECT_CALL(callback1, bytes_to_send_in_next_message)
519       .WillOnce(Return(kMediumPacket))  // When making active
520       .WillOnce(Return(kSmallPacket))
521       .WillOnce(Return(kLargePacket))
522       .WillOnce(Return(0));
523   auto stream1 =
524       scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
525   stream1->MaybeMakeActive();
526 
527   // Stream with priority = 200 -> inverse weight ~=50
528   StrictMock<MockStreamProducer> callback2;
529   EXPECT_CALL(callback2, Produce)
530       // virtual finish time ~ 0 + 50 * 50 = 2500
531       .WillOnce(CreateChunk(StreamID(2), MID(200), kMediumPacket))
532       // virtual finish time ~ 2500 + 70 * 50 = 6000
533       .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket))
534       // virtual finish time ~ 6000 + 20 * 50 = 7000
535       .WillOnce(CreateChunk(StreamID(2), MID(202), kSmallPacket));
536   EXPECT_CALL(callback2, bytes_to_send_in_next_message)
537       .WillOnce(Return(kMediumPacket))  // When making active
538       .WillOnce(Return(kLargePacket))
539       .WillOnce(Return(kSmallPacket))
540       .WillOnce(Return(0));
541   auto stream2 =
542       scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
543   stream2->MaybeMakeActive();
544 
545   // Stream with priority = 500 -> inverse weight ~=20
546   StrictMock<MockStreamProducer> callback3;
547   EXPECT_CALL(callback3, Produce)
548       // virtual finish time ~ 0 + 20 * 20 = 400
549       .WillOnce(CreateChunk(StreamID(3), MID(300), kSmallPacket))
550       // virtual finish time ~ 400 + 50 * 20 = 1400
551       .WillOnce(CreateChunk(StreamID(3), MID(301), kMediumPacket))
552       // virtual finish time ~ 1400 + 70 * 20 = 2800
553       .WillOnce(CreateChunk(StreamID(3), MID(302), kLargePacket));
554   EXPECT_CALL(callback3, bytes_to_send_in_next_message)
555       .WillOnce(Return(kSmallPacket))  // When making active
556       .WillOnce(Return(kMediumPacket))
557       .WillOnce(Return(kLargePacket))
558       .WillOnce(Return(0));
559   auto stream3 =
560       scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
561   stream3->MaybeMakeActive();
562 
563   // t ~= 400
564   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300)));
565   // t ~= 1400
566   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301)));
567   // t ~= 2500
568   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
569   // t ~= 2800
570   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302)));
571   // t ~= 4000
572   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
573   // t ~= 5600
574   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
575   // t ~= 6000
576   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
577   // t ~= 7000
578   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
579   // t ~= 11200
580   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
581   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
582 }
TEST(StreamSchedulerTest,WillDistributeWFQPacketsInTwoStreamsByPriority)583 TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) {
584   // A simple test with two streams of different priority, but sending packets
585   // of identical size. Verifies that the ratio of sent packets represent their
586   // priority.
587   StreamScheduler scheduler(kMtu);
588   scheduler.EnableMessageInterleaving(true);
589 
590   TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
591   TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
592 
593   std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 15);
594   EXPECT_EQ(packet_counts[StreamID(1)], 5U);
595   EXPECT_EQ(packet_counts[StreamID(2)], 10U);
596 }
597 
TEST(StreamSchedulerTest,WillDistributeWFQPacketsInFourStreamsByPriority)598 TEST(StreamSchedulerTest, WillDistributeWFQPacketsInFourStreamsByPriority) {
599   // Same as `WillDistributeWFQPacketsInTwoStreamsByPriority` but with more
600   // streams.
601   StreamScheduler scheduler(kMtu);
602   scheduler.EnableMessageInterleaving(true);
603 
604   TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
605   TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
606   TestStream stream3(scheduler, StreamID(3), StreamPriority(300), kPayloadSize);
607   TestStream stream4(scheduler, StreamID(4), StreamPriority(400), kPayloadSize);
608 
609   std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 50);
610   EXPECT_EQ(packet_counts[StreamID(1)], 5U);
611   EXPECT_EQ(packet_counts[StreamID(2)], 10U);
612   EXPECT_EQ(packet_counts[StreamID(3)], 15U);
613   EXPECT_EQ(packet_counts[StreamID(4)], 20U);
614 }
615 
TEST(StreamSchedulerTest,WillDistributeFromTwoStreamsFairly)616 TEST(StreamSchedulerTest, WillDistributeFromTwoStreamsFairly) {
617   // A simple test with two streams of different priority, but sending packets
618   // of different size. Verifies that the ratio of total packet payload
619   // represent their priority.
620   // In this example,
621   // * stream1 has priority 100 and sends packets of size 8
622   // * stream2 has priority 400 and sends packets of size 4
623   // With round robin, stream1 would get twice as many payload bytes on the wire
624   // as stream2, but with WFQ and a 4x priority increase, stream2 should 4x as
625   // many payload bytes on the wire. That translates to stream2 getting 8x as
626   // many packets on the wire as they are half as large.
627   StreamScheduler scheduler(kMtu);
628   // Enable WFQ scheduler.
629   scheduler.EnableMessageInterleaving(true);
630 
631   TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
632                      /*packet_size=*/8);
633   TestStream stream2(scheduler, StreamID(2), StreamPriority(400),
634                      /*packet_size=*/4);
635 
636   std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 90);
637   EXPECT_EQ(packet_counts[StreamID(1)], 10U);
638   EXPECT_EQ(packet_counts[StreamID(2)], 80U);
639 }
640 
TEST(StreamSchedulerTest,WillDistributeFromFourStreamsFairly)641 TEST(StreamSchedulerTest, WillDistributeFromFourStreamsFairly) {
642   // Same as `WillDistributeWeightedFairFromTwoStreamsFairly` but more
643   // complicated.
644   StreamScheduler scheduler(kMtu);
645   // Enable WFQ scheduler.
646   scheduler.EnableMessageInterleaving(true);
647 
648   TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
649                      /*packet_size=*/10);
650   TestStream stream2(scheduler, StreamID(2), StreamPriority(200),
651                      /*packet_size=*/10);
652   TestStream stream3(scheduler, StreamID(3), StreamPriority(200),
653                      /*packet_size=*/20);
654   TestStream stream4(scheduler, StreamID(4), StreamPriority(400),
655                      /*packet_size=*/30);
656 
657   std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 80);
658   // 15 packets * 10 bytes = 150 bytes at priority 100.
659   EXPECT_EQ(packet_counts[StreamID(1)], 15U);
660   // 30 packets * 10 bytes = 300 bytes at priority 200.
661   EXPECT_EQ(packet_counts[StreamID(2)], 30U);
662   // 15 packets * 20 bytes = 300 bytes at priority 200.
663   EXPECT_EQ(packet_counts[StreamID(3)], 15U);
664   // 20 packets * 30 bytes = 600 bytes at priority 400.
665   EXPECT_EQ(packet_counts[StreamID(4)], 20U);
666 }
667 
668 // Sending large messages with small MTU will fragment the messages and produce
669 // a first fragment not larger than the MTU, and will then not first send from
670 // the stream with the smallest message, as their first fragment will be equally
671 // small for both streams. See `LargeMessageWithLargeMtu` for the same test, but
672 // with a larger MTU.
TEST(StreamSchedulerTest,SendLargeMessageWithSmallMtu)673 TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) {
674   StreamScheduler scheduler(100 + SctpPacket::kHeaderSize +
675                             IDataChunk::kHeaderSize);
676   scheduler.EnableMessageInterleaving(true);
677 
678   StrictMock<MockStreamProducer> producer1;
679   EXPECT_CALL(producer1, Produce)
680       .WillOnce(CreateChunk(StreamID(1), MID(0), 100))
681       .WillOnce(CreateChunk(StreamID(1), MID(0), 100));
682   EXPECT_CALL(producer1, bytes_to_send_in_next_message)
683       .WillOnce(Return(200))  // When making active
684       .WillOnce(Return(100))
685       .WillOnce(Return(0));
686   auto stream1 =
687       scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
688   stream1->MaybeMakeActive();
689 
690   StrictMock<MockStreamProducer> producer2;
691   EXPECT_CALL(producer2, Produce)
692       .WillOnce(CreateChunk(StreamID(2), MID(1), 100))
693       .WillOnce(CreateChunk(StreamID(2), MID(1), 50));
694   EXPECT_CALL(producer2, bytes_to_send_in_next_message)
695       .WillOnce(Return(150))  // When making active
696       .WillOnce(Return(50))
697       .WillOnce(Return(0));
698   auto stream2 =
699       scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
700   stream2->MaybeMakeActive();
701   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
702   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
703   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
704   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
705   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
706 }
707 
708 // Sending large messages with large MTU will not fragment messages and will
709 // send the message first from the stream that has the smallest message.
TEST(StreamSchedulerTest,SendLargeMessageWithLargeMtu)710 TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) {
711   StreamScheduler scheduler(200 + SctpPacket::kHeaderSize +
712                             IDataChunk::kHeaderSize);
713   scheduler.EnableMessageInterleaving(true);
714 
715   StrictMock<MockStreamProducer> producer1;
716   EXPECT_CALL(producer1, Produce)
717       .WillOnce(CreateChunk(StreamID(1), MID(0), 200));
718   EXPECT_CALL(producer1, bytes_to_send_in_next_message)
719       .WillOnce(Return(200))  // When making active
720       .WillOnce(Return(0));
721   auto stream1 =
722       scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
723   stream1->MaybeMakeActive();
724 
725   StrictMock<MockStreamProducer> producer2;
726   EXPECT_CALL(producer2, Produce)
727       .WillOnce(CreateChunk(StreamID(2), MID(1), 150));
728   EXPECT_CALL(producer2, bytes_to_send_in_next_message)
729       .WillOnce(Return(150))  // When making active
730       .WillOnce(Return(0));
731   auto stream2 =
732       scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
733   stream2->MaybeMakeActive();
734   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
735   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
736   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
737 }
738 
739 }  // namespace
740 }  // namespace dcsctp
741