1 /*
2 * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10 #include "net/dcsctp/tx/rr_send_queue.h"
11
12 #include <cstdint>
13 #include <type_traits>
14 #include <vector>
15
16 #include "net/dcsctp/packet/data.h"
17 #include "net/dcsctp/public/dcsctp_message.h"
18 #include "net/dcsctp/public/dcsctp_options.h"
19 #include "net/dcsctp/public/dcsctp_socket.h"
20 #include "net/dcsctp/public/types.h"
21 #include "net/dcsctp/socket/mock_dcsctp_socket_callbacks.h"
22 #include "net/dcsctp/testing/testing_macros.h"
23 #include "net/dcsctp/tx/send_queue.h"
24 #include "rtc_base/gunit.h"
25 #include "test/gmock.h"
26
27 namespace dcsctp {
28 namespace {
29 using ::testing::SizeIs;
30 using ::testing::UnorderedElementsAre;
31
32 constexpr TimeMs kNow = TimeMs(0);
33 constexpr StreamID kStreamID(1);
34 constexpr PPID kPPID(53);
35 constexpr size_t kMaxQueueSize = 1000;
36 constexpr StreamPriority kDefaultPriority(10);
37 constexpr size_t kBufferedAmountLowThreshold = 500;
38 constexpr size_t kOneFragmentPacketSize = 100;
39 constexpr size_t kTwoFragmentPacketSize = 101;
40 constexpr size_t kMtu = 1100;
41
42 class RRSendQueueTest : public testing::Test {
43 protected:
RRSendQueueTest()44 RRSendQueueTest()
45 : buf_("log: ",
46 &callbacks_,
47 kMaxQueueSize,
48 kMtu,
49 kDefaultPriority,
50 kBufferedAmountLowThreshold) {}
51
52 testing::NiceMock<MockDcSctpSocketCallbacks> callbacks_;
53 const DcSctpOptions options_;
54 RRSendQueue buf_;
55 };
56
TEST_F(RRSendQueueTest,EmptyBuffer)57 TEST_F(RRSendQueueTest, EmptyBuffer) {
58 EXPECT_TRUE(buf_.IsEmpty());
59 EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
60 EXPECT_FALSE(buf_.IsFull());
61 }
62
TEST_F(RRSendQueueTest,AddAndGetSingleChunk)63 TEST_F(RRSendQueueTest, AddAndGetSingleChunk) {
64 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
65
66 EXPECT_FALSE(buf_.IsEmpty());
67 EXPECT_FALSE(buf_.IsFull());
68 absl::optional<SendQueue::DataToSend> chunk_opt =
69 buf_.Produce(kNow, kOneFragmentPacketSize);
70 ASSERT_TRUE(chunk_opt.has_value());
71 EXPECT_TRUE(chunk_opt->data.is_beginning);
72 EXPECT_TRUE(chunk_opt->data.is_end);
73 }
74
TEST_F(RRSendQueueTest,CarveOutBeginningMiddleAndEnd)75 TEST_F(RRSendQueueTest, CarveOutBeginningMiddleAndEnd) {
76 std::vector<uint8_t> payload(60);
77 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
78
79 absl::optional<SendQueue::DataToSend> chunk_beg =
80 buf_.Produce(kNow, /*max_size=*/20);
81 ASSERT_TRUE(chunk_beg.has_value());
82 EXPECT_TRUE(chunk_beg->data.is_beginning);
83 EXPECT_FALSE(chunk_beg->data.is_end);
84
85 absl::optional<SendQueue::DataToSend> chunk_mid =
86 buf_.Produce(kNow, /*max_size=*/20);
87 ASSERT_TRUE(chunk_mid.has_value());
88 EXPECT_FALSE(chunk_mid->data.is_beginning);
89 EXPECT_FALSE(chunk_mid->data.is_end);
90
91 absl::optional<SendQueue::DataToSend> chunk_end =
92 buf_.Produce(kNow, /*max_size=*/20);
93 ASSERT_TRUE(chunk_end.has_value());
94 EXPECT_FALSE(chunk_end->data.is_beginning);
95 EXPECT_TRUE(chunk_end->data.is_end);
96
97 EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
98 }
99
TEST_F(RRSendQueueTest,GetChunksFromTwoMessages)100 TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
101 std::vector<uint8_t> payload(60);
102 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
103 buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
104
105 absl::optional<SendQueue::DataToSend> chunk_one =
106 buf_.Produce(kNow, kOneFragmentPacketSize);
107 ASSERT_TRUE(chunk_one.has_value());
108 EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
109 EXPECT_EQ(chunk_one->data.ppid, kPPID);
110 EXPECT_TRUE(chunk_one->data.is_beginning);
111 EXPECT_TRUE(chunk_one->data.is_end);
112
113 absl::optional<SendQueue::DataToSend> chunk_two =
114 buf_.Produce(kNow, kOneFragmentPacketSize);
115 ASSERT_TRUE(chunk_two.has_value());
116 EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
117 EXPECT_EQ(chunk_two->data.ppid, PPID(54));
118 EXPECT_TRUE(chunk_two->data.is_beginning);
119 EXPECT_TRUE(chunk_two->data.is_end);
120 }
121
TEST_F(RRSendQueueTest,BufferBecomesFullAndEmptied)122 TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
123 std::vector<uint8_t> payload(600);
124 EXPECT_FALSE(buf_.IsFull());
125 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
126 EXPECT_FALSE(buf_.IsFull());
127 buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
128 EXPECT_TRUE(buf_.IsFull());
129 // However, it's still possible to add messages. It's a soft limit, and it
130 // might be necessary to forcefully add messages due to e.g. external
131 // fragmentation.
132 buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
133 EXPECT_TRUE(buf_.IsFull());
134
135 absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000);
136 ASSERT_TRUE(chunk_one.has_value());
137 EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
138 EXPECT_EQ(chunk_one->data.ppid, kPPID);
139
140 EXPECT_TRUE(buf_.IsFull());
141
142 absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000);
143 ASSERT_TRUE(chunk_two.has_value());
144 EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
145 EXPECT_EQ(chunk_two->data.ppid, PPID(54));
146
147 EXPECT_FALSE(buf_.IsFull());
148 EXPECT_FALSE(buf_.IsEmpty());
149
150 absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000);
151 ASSERT_TRUE(chunk_three.has_value());
152 EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
153 EXPECT_EQ(chunk_three->data.ppid, PPID(55));
154
155 EXPECT_FALSE(buf_.IsFull());
156 EXPECT_TRUE(buf_.IsEmpty());
157 }
158
TEST_F(RRSendQueueTest,DefaultsToOrderedSend)159 TEST_F(RRSendQueueTest, DefaultsToOrderedSend) {
160 std::vector<uint8_t> payload(20);
161
162 // Default is ordered
163 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
164 absl::optional<SendQueue::DataToSend> chunk_one =
165 buf_.Produce(kNow, kOneFragmentPacketSize);
166 ASSERT_TRUE(chunk_one.has_value());
167 EXPECT_FALSE(chunk_one->data.is_unordered);
168
169 // Explicitly unordered.
170 SendOptions opts;
171 opts.unordered = IsUnordered(true);
172 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts);
173 absl::optional<SendQueue::DataToSend> chunk_two =
174 buf_.Produce(kNow, kOneFragmentPacketSize);
175 ASSERT_TRUE(chunk_two.has_value());
176 EXPECT_TRUE(chunk_two->data.is_unordered);
177 }
178
TEST_F(RRSendQueueTest,ProduceWithLifetimeExpiry)179 TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) {
180 std::vector<uint8_t> payload(20);
181
182 // Default is no expiry
183 TimeMs now = kNow;
184 buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload));
185 now += DurationMs(1000000);
186 ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
187
188 SendOptions expires_2_seconds;
189 expires_2_seconds.lifetime = DurationMs(2000);
190
191 // Add and consume within lifetime
192 buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
193 now += DurationMs(2000);
194 ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
195
196 // Add and consume just outside lifetime
197 buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
198 now += DurationMs(2001);
199 ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
200
201 // A long time after expiry
202 buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
203 now += DurationMs(1000000);
204 ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
205
206 // Expire one message, but produce the second that is not expired.
207 buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
208
209 SendOptions expires_4_seconds;
210 expires_4_seconds.lifetime = DurationMs(4000);
211
212 buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds);
213 now += DurationMs(2001);
214
215 ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
216 ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
217 }
218
TEST_F(RRSendQueueTest,DiscardPartialPackets)219 TEST_F(RRSendQueueTest, DiscardPartialPackets) {
220 std::vector<uint8_t> payload(120);
221
222 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
223 buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload));
224
225 absl::optional<SendQueue::DataToSend> chunk_one =
226 buf_.Produce(kNow, kOneFragmentPacketSize);
227 ASSERT_TRUE(chunk_one.has_value());
228 EXPECT_FALSE(chunk_one->data.is_end);
229 EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
230 buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
231 chunk_one->data.message_id);
232
233 absl::optional<SendQueue::DataToSend> chunk_two =
234 buf_.Produce(kNow, kOneFragmentPacketSize);
235 ASSERT_TRUE(chunk_two.has_value());
236 EXPECT_FALSE(chunk_two->data.is_end);
237 EXPECT_EQ(chunk_two->data.stream_id, StreamID(2));
238
239 absl::optional<SendQueue::DataToSend> chunk_three =
240 buf_.Produce(kNow, kOneFragmentPacketSize);
241 ASSERT_TRUE(chunk_three.has_value());
242 EXPECT_TRUE(chunk_three->data.is_end);
243 EXPECT_EQ(chunk_three->data.stream_id, StreamID(2));
244 ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
245
246 // Calling it again shouldn't cause issues.
247 buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
248 chunk_one->data.message_id);
249 ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
250 }
251
TEST_F(RRSendQueueTest,PrepareResetStreamsDiscardsStream)252 TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) {
253 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3}));
254 buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5}));
255 EXPECT_EQ(buf_.total_buffered_amount(), 8u);
256
257 buf_.PrepareResetStream(StreamID(1));
258 EXPECT_EQ(buf_.total_buffered_amount(), 5u);
259
260 EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
261 UnorderedElementsAre(StreamID(1)));
262 buf_.CommitResetStreams();
263 buf_.PrepareResetStream(StreamID(2));
264 EXPECT_EQ(buf_.total_buffered_amount(), 0u);
265 }
266
TEST_F(RRSendQueueTest,PrepareResetStreamsNotPartialPackets)267 TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) {
268 std::vector<uint8_t> payload(120);
269
270 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
271 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
272
273 absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
274 ASSERT_TRUE(chunk_one.has_value());
275 EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
276 EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50);
277
278 buf_.PrepareResetStream(StreamID(1));
279 EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50);
280 }
281
TEST_F(RRSendQueueTest,EnqueuedItemsArePausedDuringStreamReset)282 TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
283 std::vector<uint8_t> payload(50);
284
285 buf_.PrepareResetStream(StreamID(1));
286 EXPECT_EQ(buf_.total_buffered_amount(), 0u);
287
288 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
289 EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
290
291 EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
292
293 EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
294 EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
295 UnorderedElementsAre(StreamID(1)));
296
297 EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
298
299 buf_.CommitResetStreams();
300 EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
301
302 absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
303 ASSERT_TRUE(chunk_one.has_value());
304 EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
305 EXPECT_EQ(buf_.total_buffered_amount(), 0u);
306 }
307
TEST_F(RRSendQueueTest,PausedStreamsStillSendPartialMessagesUntilEnd)308 TEST_F(RRSendQueueTest, PausedStreamsStillSendPartialMessagesUntilEnd) {
309 constexpr size_t kPayloadSize = 100;
310 constexpr size_t kFragmentSize = 50;
311 std::vector<uint8_t> payload(kPayloadSize);
312
313 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
314 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
315
316 absl::optional<SendQueue::DataToSend> chunk_one =
317 buf_.Produce(kNow, kFragmentSize);
318 ASSERT_TRUE(chunk_one.has_value());
319 EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
320 EXPECT_EQ(buf_.total_buffered_amount(), 2 * kPayloadSize - kFragmentSize);
321
322 // This will stop the second message from being sent.
323 buf_.PrepareResetStream(StreamID(1));
324 EXPECT_EQ(buf_.total_buffered_amount(), 1 * kPayloadSize - kFragmentSize);
325
326 // Should still produce fragments until end of message.
327 absl::optional<SendQueue::DataToSend> chunk_two =
328 buf_.Produce(kNow, kFragmentSize);
329 ASSERT_TRUE(chunk_two.has_value());
330 EXPECT_EQ(chunk_two->data.stream_id, kStreamID);
331 EXPECT_EQ(buf_.total_buffered_amount(), 0ul);
332
333 // But shouldn't produce any more messages as the stream is paused.
334 EXPECT_FALSE(buf_.Produce(kNow, kFragmentSize).has_value());
335 }
336
TEST_F(RRSendQueueTest,CommittingResetsSSN)337 TEST_F(RRSendQueueTest, CommittingResetsSSN) {
338 std::vector<uint8_t> payload(50);
339
340 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
341 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
342
343 absl::optional<SendQueue::DataToSend> chunk_one =
344 buf_.Produce(kNow, kOneFragmentPacketSize);
345 ASSERT_TRUE(chunk_one.has_value());
346 EXPECT_EQ(chunk_one->data.ssn, SSN(0));
347
348 absl::optional<SendQueue::DataToSend> chunk_two =
349 buf_.Produce(kNow, kOneFragmentPacketSize);
350 ASSERT_TRUE(chunk_two.has_value());
351 EXPECT_EQ(chunk_two->data.ssn, SSN(1));
352
353 buf_.PrepareResetStream(StreamID(1));
354
355 // Buffered
356 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
357
358 EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
359 EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
360 UnorderedElementsAre(StreamID(1)));
361 buf_.CommitResetStreams();
362
363 absl::optional<SendQueue::DataToSend> chunk_three =
364 buf_.Produce(kNow, kOneFragmentPacketSize);
365 ASSERT_TRUE(chunk_three.has_value());
366 EXPECT_EQ(chunk_three->data.ssn, SSN(0));
367 }
368
TEST_F(RRSendQueueTest,CommittingResetsSSNForPausedStreamsOnly)369 TEST_F(RRSendQueueTest, CommittingResetsSSNForPausedStreamsOnly) {
370 std::vector<uint8_t> payload(50);
371
372 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
373 buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));
374
375 absl::optional<SendQueue::DataToSend> chunk_one =
376 buf_.Produce(kNow, kOneFragmentPacketSize);
377 ASSERT_TRUE(chunk_one.has_value());
378 EXPECT_EQ(chunk_one->data.stream_id, StreamID(1));
379 EXPECT_EQ(chunk_one->data.ssn, SSN(0));
380
381 absl::optional<SendQueue::DataToSend> chunk_two =
382 buf_.Produce(kNow, kOneFragmentPacketSize);
383 ASSERT_TRUE(chunk_two.has_value());
384 EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
385 EXPECT_EQ(chunk_two->data.ssn, SSN(0));
386
387 buf_.PrepareResetStream(StreamID(3));
388
389 // Send two more messages - SID 3 will buffer, SID 1 will send.
390 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
391 buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));
392
393 EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
394 EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
395 UnorderedElementsAre(StreamID(3)));
396
397 buf_.CommitResetStreams();
398
399 absl::optional<SendQueue::DataToSend> chunk_three =
400 buf_.Produce(kNow, kOneFragmentPacketSize);
401 ASSERT_TRUE(chunk_three.has_value());
402 EXPECT_EQ(chunk_three->data.stream_id, StreamID(1));
403 EXPECT_EQ(chunk_three->data.ssn, SSN(1));
404
405 absl::optional<SendQueue::DataToSend> chunk_four =
406 buf_.Produce(kNow, kOneFragmentPacketSize);
407 ASSERT_TRUE(chunk_four.has_value());
408 EXPECT_EQ(chunk_four->data.stream_id, StreamID(3));
409 EXPECT_EQ(chunk_four->data.ssn, SSN(0));
410 }
411
TEST_F(RRSendQueueTest,RollBackResumesSSN)412 TEST_F(RRSendQueueTest, RollBackResumesSSN) {
413 std::vector<uint8_t> payload(50);
414
415 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
416 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
417
418 absl::optional<SendQueue::DataToSend> chunk_one =
419 buf_.Produce(kNow, kOneFragmentPacketSize);
420 ASSERT_TRUE(chunk_one.has_value());
421 EXPECT_EQ(chunk_one->data.ssn, SSN(0));
422
423 absl::optional<SendQueue::DataToSend> chunk_two =
424 buf_.Produce(kNow, kOneFragmentPacketSize);
425 ASSERT_TRUE(chunk_two.has_value());
426 EXPECT_EQ(chunk_two->data.ssn, SSN(1));
427
428 buf_.PrepareResetStream(StreamID(1));
429
430 // Buffered
431 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
432
433 EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
434 EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
435 UnorderedElementsAre(StreamID(1)));
436 buf_.RollbackResetStreams();
437
438 absl::optional<SendQueue::DataToSend> chunk_three =
439 buf_.Produce(kNow, kOneFragmentPacketSize);
440 ASSERT_TRUE(chunk_three.has_value());
441 EXPECT_EQ(chunk_three->data.ssn, SSN(2));
442 }
443
TEST_F(RRSendQueueTest,ReturnsFragmentsForOneMessageBeforeMovingToNext)444 TEST_F(RRSendQueueTest, ReturnsFragmentsForOneMessageBeforeMovingToNext) {
445 std::vector<uint8_t> payload(200);
446 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
447 buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
448
449 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
450 buf_.Produce(kNow, kOneFragmentPacketSize));
451 EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
452
453 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
454 buf_.Produce(kNow, kOneFragmentPacketSize));
455 EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
456
457 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
458 buf_.Produce(kNow, kOneFragmentPacketSize));
459 EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
460
461 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
462 buf_.Produce(kNow, kOneFragmentPacketSize));
463 EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
464 }
465
TEST_F(RRSendQueueTest,ReturnsAlsoSmallFragmentsBeforeMovingToNext)466 TEST_F(RRSendQueueTest, ReturnsAlsoSmallFragmentsBeforeMovingToNext) {
467 std::vector<uint8_t> payload(kTwoFragmentPacketSize);
468 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
469 buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
470
471 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
472 buf_.Produce(kNow, kOneFragmentPacketSize));
473 EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
474 EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
475
476 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
477 buf_.Produce(kNow, kOneFragmentPacketSize));
478 EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
479 EXPECT_THAT(chunk2.data.payload,
480 SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
481
482 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
483 buf_.Produce(kNow, kOneFragmentPacketSize));
484 EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
485 EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
486
487 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
488 buf_.Produce(kNow, kOneFragmentPacketSize));
489 EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
490 EXPECT_THAT(chunk4.data.payload,
491 SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
492 }
493
TEST_F(RRSendQueueTest,WillCycleInRoundRobinFashionBetweenStreams)494 TEST_F(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) {
495 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
496 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(2)));
497 buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(3)));
498 buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(4)));
499 buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(5)));
500 buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(6)));
501 buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(7)));
502 buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(8)));
503
504 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
505 buf_.Produce(kNow, kOneFragmentPacketSize));
506 EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
507 EXPECT_THAT(chunk1.data.payload, SizeIs(1));
508
509 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
510 buf_.Produce(kNow, kOneFragmentPacketSize));
511 EXPECT_EQ(chunk2.data.stream_id, StreamID(2));
512 EXPECT_THAT(chunk2.data.payload, SizeIs(3));
513
514 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
515 buf_.Produce(kNow, kOneFragmentPacketSize));
516 EXPECT_EQ(chunk3.data.stream_id, StreamID(3));
517 EXPECT_THAT(chunk3.data.payload, SizeIs(5));
518
519 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
520 buf_.Produce(kNow, kOneFragmentPacketSize));
521 EXPECT_EQ(chunk4.data.stream_id, StreamID(4));
522 EXPECT_THAT(chunk4.data.payload, SizeIs(7));
523
524 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk5,
525 buf_.Produce(kNow, kOneFragmentPacketSize));
526 EXPECT_EQ(chunk5.data.stream_id, StreamID(1));
527 EXPECT_THAT(chunk5.data.payload, SizeIs(2));
528
529 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk6,
530 buf_.Produce(kNow, kOneFragmentPacketSize));
531 EXPECT_EQ(chunk6.data.stream_id, StreamID(2));
532 EXPECT_THAT(chunk6.data.payload, SizeIs(4));
533
534 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk7,
535 buf_.Produce(kNow, kOneFragmentPacketSize));
536 EXPECT_EQ(chunk7.data.stream_id, StreamID(3));
537 EXPECT_THAT(chunk7.data.payload, SizeIs(6));
538
539 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk8,
540 buf_.Produce(kNow, kOneFragmentPacketSize));
541 EXPECT_EQ(chunk8.data.stream_id, StreamID(4));
542 EXPECT_THAT(chunk8.data.payload, SizeIs(8));
543 }
544
TEST_F(RRSendQueueTest,DoesntTriggerOnBufferedAmountLowWhenSetToZero)545 TEST_F(RRSendQueueTest, DoesntTriggerOnBufferedAmountLowWhenSetToZero) {
546 EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
547 buf_.SetBufferedAmountLowThreshold(StreamID(1), 0u);
548 }
549
TEST_F(RRSendQueueTest,TriggersOnBufferedAmountAtZeroLowWhenSent)550 TEST_F(RRSendQueueTest, TriggersOnBufferedAmountAtZeroLowWhenSent) {
551 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
552 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u);
553
554 EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
555
556 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
557 buf_.Produce(kNow, kOneFragmentPacketSize));
558 EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
559 EXPECT_THAT(chunk1.data.payload, SizeIs(1));
560 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
561 }
562
TEST_F(RRSendQueueTest,WillRetriggerOnBufferedAmountLowIfAddingMore)563 TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowIfAddingMore) {
564 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
565
566 EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
567
568 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
569 buf_.Produce(kNow, kOneFragmentPacketSize));
570 EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
571 EXPECT_THAT(chunk1.data.payload, SizeIs(1));
572
573 EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
574
575 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
576 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u);
577
578 // Should now trigger again, as buffer_amount went above the threshold.
579 EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
580 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
581 buf_.Produce(kNow, kOneFragmentPacketSize));
582 EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
583 EXPECT_THAT(chunk2.data.payload, SizeIs(1));
584 }
585
TEST_F(RRSendQueueTest,OnlyTriggersWhenTransitioningFromAboveToBelowOrEqual)586 TEST_F(RRSendQueueTest, OnlyTriggersWhenTransitioningFromAboveToBelowOrEqual) {
587 buf_.SetBufferedAmountLowThreshold(StreamID(1), 1000);
588
589 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(10)));
590 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 10u);
591
592 EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
593 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
594 buf_.Produce(kNow, kOneFragmentPacketSize));
595 EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
596 EXPECT_THAT(chunk1.data.payload, SizeIs(10));
597 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
598
599 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(20)));
600 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 20u);
601
602 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
603 buf_.Produce(kNow, kOneFragmentPacketSize));
604 EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
605 EXPECT_THAT(chunk2.data.payload, SizeIs(20));
606 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
607 }
608
TEST_F(RRSendQueueTest,WillTriggerOnBufferedAmountLowSetAboveZero)609 TEST_F(RRSendQueueTest, WillTriggerOnBufferedAmountLowSetAboveZero) {
610 EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
611
612 buf_.SetBufferedAmountLowThreshold(StreamID(1), 700);
613
614 std::vector<uint8_t> payload(1000);
615 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
616
617 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
618 buf_.Produce(kNow, kOneFragmentPacketSize));
619 EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
620 EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
621 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 900u);
622
623 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
624 buf_.Produce(kNow, kOneFragmentPacketSize));
625 EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
626 EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize));
627 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u);
628
629 EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
630
631 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
632 buf_.Produce(kNow, kOneFragmentPacketSize));
633 EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
634 EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
635 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 700u);
636
637 // Doesn't trigger when reducing even further.
638 EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
639
640 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
641 buf_.Produce(kNow, kOneFragmentPacketSize));
642 EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
643 EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
644 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
645 }
646
TEST_F(RRSendQueueTest,WillRetriggerOnBufferedAmountLowSetAboveZero)647 TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowSetAboveZero) {
648 EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
649
650 buf_.SetBufferedAmountLowThreshold(StreamID(1), 700);
651
652 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1000)));
653
654 EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
655 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
656 buf_.Produce(kNow, 400));
657 EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
658 EXPECT_THAT(chunk1.data.payload, SizeIs(400));
659 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
660
661 EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
662 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(200)));
663 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u);
664
665 // Will trigger again, as it went above the limit.
666 EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
667 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
668 buf_.Produce(kNow, 200));
669 EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
670 EXPECT_THAT(chunk2.data.payload, SizeIs(200));
671 EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
672 }
673
TEST_F(RRSendQueueTest,TriggersOnBufferedAmountLowOnThresholdChanged)674 TEST_F(RRSendQueueTest, TriggersOnBufferedAmountLowOnThresholdChanged) {
675 EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
676
677 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(100)));
678
679 // Modifying the threshold, still under buffered_amount, should not trigger.
680 buf_.SetBufferedAmountLowThreshold(StreamID(1), 50);
681 buf_.SetBufferedAmountLowThreshold(StreamID(1), 99);
682
683 // When the threshold reaches buffered_amount, it will trigger.
684 EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
685 buf_.SetBufferedAmountLowThreshold(StreamID(1), 100);
686
687 // But not when it's set low again.
688 EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
689 buf_.SetBufferedAmountLowThreshold(StreamID(1), 50);
690
691 // But it will trigger when it overshoots.
692 EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
693 buf_.SetBufferedAmountLowThreshold(StreamID(1), 150);
694
695 // But not when it's set low again.
696 EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
697 buf_.SetBufferedAmountLowThreshold(StreamID(1), 0);
698 }
699
TEST_F(RRSendQueueTest,OnTotalBufferedAmountLowDoesNotTriggerOnBufferFillingUp)700 TEST_F(RRSendQueueTest,
701 OnTotalBufferedAmountLowDoesNotTriggerOnBufferFillingUp) {
702 EXPECT_CALL(callbacks_, OnTotalBufferedAmountLow).Times(0);
703 std::vector<uint8_t> payload(kBufferedAmountLowThreshold - 1);
704 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
705 EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
706
707 // Will not trigger if going above but never below.
708 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID,
709 std::vector<uint8_t>(kOneFragmentPacketSize)));
710 }
711
TEST_F(RRSendQueueTest,TriggersOnTotalBufferedAmountLowWhenCrossing)712 TEST_F(RRSendQueueTest, TriggersOnTotalBufferedAmountLowWhenCrossing) {
713 EXPECT_CALL(callbacks_, OnTotalBufferedAmountLow).Times(0);
714 std::vector<uint8_t> payload(kBufferedAmountLowThreshold);
715 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
716 EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
717
718 // Reaches it.
719 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, std::vector<uint8_t>(1)));
720
721 // Drain it a bit - will trigger.
722 EXPECT_CALL(callbacks_, OnTotalBufferedAmountLow).Times(1);
723 absl::optional<SendQueue::DataToSend> chunk_two =
724 buf_.Produce(kNow, kOneFragmentPacketSize);
725 }
726
TEST_F(RRSendQueueTest,WillStayInAStreamAsLongAsThatMessageIsSending)727 TEST_F(RRSendQueueTest, WillStayInAStreamAsLongAsThatMessageIsSending) {
728 buf_.Add(kNow, DcSctpMessage(StreamID(5), kPPID, std::vector<uint8_t>(1)));
729
730 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
731 buf_.Produce(kNow, kOneFragmentPacketSize));
732 EXPECT_EQ(chunk1.data.stream_id, StreamID(5));
733 EXPECT_THAT(chunk1.data.payload, SizeIs(1));
734
735 // Next, it should pick a different stream.
736
737 buf_.Add(kNow,
738 DcSctpMessage(StreamID(1), kPPID,
739 std::vector<uint8_t>(kOneFragmentPacketSize * 2)));
740
741 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
742 buf_.Produce(kNow, kOneFragmentPacketSize));
743 EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
744 EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize));
745
746 // It should still stay on the Stream1 now, even if might be tempted to switch
747 // to this stream, as it's the stream following 5.
748 buf_.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector<uint8_t>(1)));
749
750 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
751 buf_.Produce(kNow, kOneFragmentPacketSize));
752 EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
753 EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
754
755 // After stream id 1 is complete, it's time to do stream 6.
756 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
757 buf_.Produce(kNow, kOneFragmentPacketSize));
758 EXPECT_EQ(chunk4.data.stream_id, StreamID(6));
759 EXPECT_THAT(chunk4.data.payload, SizeIs(1));
760
761 EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
762 }
763
TEST_F(RRSendQueueTest,StreamsHaveInitialPriority)764 TEST_F(RRSendQueueTest, StreamsHaveInitialPriority) {
765 EXPECT_EQ(buf_.GetStreamPriority(StreamID(1)), kDefaultPriority);
766
767 buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(40)));
768 EXPECT_EQ(buf_.GetStreamPriority(StreamID(2)), kDefaultPriority);
769 }
770
TEST_F(RRSendQueueTest,CanChangeStreamPriority)771 TEST_F(RRSendQueueTest, CanChangeStreamPriority) {
772 buf_.SetStreamPriority(StreamID(1), StreamPriority(42));
773 EXPECT_EQ(buf_.GetStreamPriority(StreamID(1)), StreamPriority(42));
774
775 buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(40)));
776 buf_.SetStreamPriority(StreamID(2), StreamPriority(42));
777 EXPECT_EQ(buf_.GetStreamPriority(StreamID(2)), StreamPriority(42));
778 }
779
TEST_F(RRSendQueueTest,WillHandoverPriority)780 TEST_F(RRSendQueueTest, WillHandoverPriority) {
781 buf_.SetStreamPriority(StreamID(1), StreamPriority(42));
782
783 buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(40)));
784 buf_.SetStreamPriority(StreamID(2), StreamPriority(42));
785
786 DcSctpSocketHandoverState state;
787 buf_.AddHandoverState(state);
788
789 RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority,
790 kBufferedAmountLowThreshold);
791 q2.RestoreFromState(state);
792 EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));
793 EXPECT_EQ(q2.GetStreamPriority(StreamID(2)), StreamPriority(42));
794 }
795
TEST_F(RRSendQueueTest,WillSendMessagesByPrio)796 TEST_F(RRSendQueueTest, WillSendMessagesByPrio) {
797 buf_.EnableMessageInterleaving(true);
798 buf_.SetStreamPriority(StreamID(1), StreamPriority(10));
799 buf_.SetStreamPriority(StreamID(2), StreamPriority(20));
800 buf_.SetStreamPriority(StreamID(3), StreamPriority(30));
801
802 buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(40)));
803 buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(20)));
804 buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(10)));
805 std::vector<uint16_t> expected_streams = {3, 2, 2, 1, 1, 1, 1};
806
807 for (uint16_t stream_num : expected_streams) {
808 ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk,
809 buf_.Produce(kNow, 10));
810 EXPECT_EQ(chunk.data.stream_id, StreamID(stream_num));
811 }
812 EXPECT_FALSE(buf_.Produce(kNow, 1).has_value());
813 }
814
TEST_F(RRSendQueueTest,WillSendLifecycleExpireWhenExpiredInSendQueue)815 TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenExpiredInSendQueue) {
816 std::vector<uint8_t> payload(kOneFragmentPacketSize);
817 buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload),
818 SendOptions{.lifetime = DurationMs(1000),
819 .lifecycle_id = LifecycleId(1)});
820
821 EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1),
822 /*maybe_delivered=*/false));
823 EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
824 EXPECT_FALSE(buf_.Produce(kNow + DurationMs(1001), kOneFragmentPacketSize)
825 .has_value());
826 }
827
TEST_F(RRSendQueueTest,WillSendLifecycleExpireWhenDiscardingDuringPause)828 TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingDuringPause) {
829 std::vector<uint8_t> payload(120);
830
831 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
832 SendOptions{.lifecycle_id = LifecycleId(1)});
833 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
834 SendOptions{.lifecycle_id = LifecycleId(2)});
835
836 absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
837 ASSERT_TRUE(chunk_one.has_value());
838 EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
839 EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50);
840
841 EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(2),
842 /*maybe_delivered=*/false));
843 EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(2)));
844 buf_.PrepareResetStream(StreamID(1));
845 EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50);
846 }
847
TEST_F(RRSendQueueTest,WillSendLifecycleExpireWhenDiscardingExplicitly)848 TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingExplicitly) {
849 std::vector<uint8_t> payload(kOneFragmentPacketSize + 20);
850
851 buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
852 SendOptions{.lifecycle_id = LifecycleId(1)});
853
854 absl::optional<SendQueue::DataToSend> chunk_one =
855 buf_.Produce(kNow, kOneFragmentPacketSize);
856 ASSERT_TRUE(chunk_one.has_value());
857 EXPECT_FALSE(chunk_one->data.is_end);
858 EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
859 EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1),
860 /*maybe_delivered=*/false));
861 EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
862 buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
863 chunk_one->data.message_id);
864 }
865 } // namespace
866 } // namespace dcsctp
867