xref: /aosp_15_r20/external/webrtc/net/dcsctp/tx/rr_send_queue_test.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "net/dcsctp/tx/rr_send_queue.h"
11 
12 #include <cstdint>
13 #include <type_traits>
14 #include <vector>
15 
16 #include "net/dcsctp/packet/data.h"
17 #include "net/dcsctp/public/dcsctp_message.h"
18 #include "net/dcsctp/public/dcsctp_options.h"
19 #include "net/dcsctp/public/dcsctp_socket.h"
20 #include "net/dcsctp/public/types.h"
21 #include "net/dcsctp/socket/mock_dcsctp_socket_callbacks.h"
22 #include "net/dcsctp/testing/testing_macros.h"
23 #include "net/dcsctp/tx/send_queue.h"
24 #include "rtc_base/gunit.h"
25 #include "test/gmock.h"
26 
27 namespace dcsctp {
28 namespace {
29 using ::testing::SizeIs;
30 using ::testing::UnorderedElementsAre;
31 
32 constexpr TimeMs kNow = TimeMs(0);
33 constexpr StreamID kStreamID(1);
34 constexpr PPID kPPID(53);
35 constexpr size_t kMaxQueueSize = 1000;
36 constexpr StreamPriority kDefaultPriority(10);
37 constexpr size_t kBufferedAmountLowThreshold = 500;
38 constexpr size_t kOneFragmentPacketSize = 100;
39 constexpr size_t kTwoFragmentPacketSize = 101;
40 constexpr size_t kMtu = 1100;
41 
42 class RRSendQueueTest : public testing::Test {
43  protected:
RRSendQueueTest()44   RRSendQueueTest()
45       : buf_("log: ",
46              &callbacks_,
47              kMaxQueueSize,
48              kMtu,
49              kDefaultPriority,
50              kBufferedAmountLowThreshold) {}
51 
52   testing::NiceMock<MockDcSctpSocketCallbacks> callbacks_;
53   const DcSctpOptions options_;
54   RRSendQueue buf_;
55 };
56 
TEST_F(RRSendQueueTest,EmptyBuffer)57 TEST_F(RRSendQueueTest, EmptyBuffer) {
58   EXPECT_TRUE(buf_.IsEmpty());
59   EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
60   EXPECT_FALSE(buf_.IsFull());
61 }
62 
TEST_F(RRSendQueueTest,AddAndGetSingleChunk)63 TEST_F(RRSendQueueTest, AddAndGetSingleChunk) {
64   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
65 
66   EXPECT_FALSE(buf_.IsEmpty());
67   EXPECT_FALSE(buf_.IsFull());
68   absl::optional<SendQueue::DataToSend> chunk_opt =
69       buf_.Produce(kNow, kOneFragmentPacketSize);
70   ASSERT_TRUE(chunk_opt.has_value());
71   EXPECT_TRUE(chunk_opt->data.is_beginning);
72   EXPECT_TRUE(chunk_opt->data.is_end);
73 }
74 
TEST_F(RRSendQueueTest,CarveOutBeginningMiddleAndEnd)75 TEST_F(RRSendQueueTest, CarveOutBeginningMiddleAndEnd) {
76   std::vector<uint8_t> payload(60);
77   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
78 
79   absl::optional<SendQueue::DataToSend> chunk_beg =
80       buf_.Produce(kNow, /*max_size=*/20);
81   ASSERT_TRUE(chunk_beg.has_value());
82   EXPECT_TRUE(chunk_beg->data.is_beginning);
83   EXPECT_FALSE(chunk_beg->data.is_end);
84 
85   absl::optional<SendQueue::DataToSend> chunk_mid =
86       buf_.Produce(kNow, /*max_size=*/20);
87   ASSERT_TRUE(chunk_mid.has_value());
88   EXPECT_FALSE(chunk_mid->data.is_beginning);
89   EXPECT_FALSE(chunk_mid->data.is_end);
90 
91   absl::optional<SendQueue::DataToSend> chunk_end =
92       buf_.Produce(kNow, /*max_size=*/20);
93   ASSERT_TRUE(chunk_end.has_value());
94   EXPECT_FALSE(chunk_end->data.is_beginning);
95   EXPECT_TRUE(chunk_end->data.is_end);
96 
97   EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
98 }
99 
TEST_F(RRSendQueueTest,GetChunksFromTwoMessages)100 TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
101   std::vector<uint8_t> payload(60);
102   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
103   buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
104 
105   absl::optional<SendQueue::DataToSend> chunk_one =
106       buf_.Produce(kNow, kOneFragmentPacketSize);
107   ASSERT_TRUE(chunk_one.has_value());
108   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
109   EXPECT_EQ(chunk_one->data.ppid, kPPID);
110   EXPECT_TRUE(chunk_one->data.is_beginning);
111   EXPECT_TRUE(chunk_one->data.is_end);
112 
113   absl::optional<SendQueue::DataToSend> chunk_two =
114       buf_.Produce(kNow, kOneFragmentPacketSize);
115   ASSERT_TRUE(chunk_two.has_value());
116   EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
117   EXPECT_EQ(chunk_two->data.ppid, PPID(54));
118   EXPECT_TRUE(chunk_two->data.is_beginning);
119   EXPECT_TRUE(chunk_two->data.is_end);
120 }
121 
TEST_F(RRSendQueueTest,BufferBecomesFullAndEmptied)122 TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
123   std::vector<uint8_t> payload(600);
124   EXPECT_FALSE(buf_.IsFull());
125   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
126   EXPECT_FALSE(buf_.IsFull());
127   buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
128   EXPECT_TRUE(buf_.IsFull());
129   // However, it's still possible to add messages. It's a soft limit, and it
130   // might be necessary to forcefully add messages due to e.g. external
131   // fragmentation.
132   buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
133   EXPECT_TRUE(buf_.IsFull());
134 
135   absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000);
136   ASSERT_TRUE(chunk_one.has_value());
137   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
138   EXPECT_EQ(chunk_one->data.ppid, kPPID);
139 
140   EXPECT_TRUE(buf_.IsFull());
141 
142   absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000);
143   ASSERT_TRUE(chunk_two.has_value());
144   EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
145   EXPECT_EQ(chunk_two->data.ppid, PPID(54));
146 
147   EXPECT_FALSE(buf_.IsFull());
148   EXPECT_FALSE(buf_.IsEmpty());
149 
150   absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000);
151   ASSERT_TRUE(chunk_three.has_value());
152   EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
153   EXPECT_EQ(chunk_three->data.ppid, PPID(55));
154 
155   EXPECT_FALSE(buf_.IsFull());
156   EXPECT_TRUE(buf_.IsEmpty());
157 }
158 
TEST_F(RRSendQueueTest,DefaultsToOrderedSend)159 TEST_F(RRSendQueueTest, DefaultsToOrderedSend) {
160   std::vector<uint8_t> payload(20);
161 
162   // Default is ordered
163   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
164   absl::optional<SendQueue::DataToSend> chunk_one =
165       buf_.Produce(kNow, kOneFragmentPacketSize);
166   ASSERT_TRUE(chunk_one.has_value());
167   EXPECT_FALSE(chunk_one->data.is_unordered);
168 
169   // Explicitly unordered.
170   SendOptions opts;
171   opts.unordered = IsUnordered(true);
172   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts);
173   absl::optional<SendQueue::DataToSend> chunk_two =
174       buf_.Produce(kNow, kOneFragmentPacketSize);
175   ASSERT_TRUE(chunk_two.has_value());
176   EXPECT_TRUE(chunk_two->data.is_unordered);
177 }
178 
TEST_F(RRSendQueueTest,ProduceWithLifetimeExpiry)179 TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) {
180   std::vector<uint8_t> payload(20);
181 
182   // Default is no expiry
183   TimeMs now = kNow;
184   buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload));
185   now += DurationMs(1000000);
186   ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
187 
188   SendOptions expires_2_seconds;
189   expires_2_seconds.lifetime = DurationMs(2000);
190 
191   // Add and consume within lifetime
192   buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
193   now += DurationMs(2000);
194   ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
195 
196   // Add and consume just outside lifetime
197   buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
198   now += DurationMs(2001);
199   ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
200 
201   // A long time after expiry
202   buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
203   now += DurationMs(1000000);
204   ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
205 
206   // Expire one message, but produce the second that is not expired.
207   buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
208 
209   SendOptions expires_4_seconds;
210   expires_4_seconds.lifetime = DurationMs(4000);
211 
212   buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds);
213   now += DurationMs(2001);
214 
215   ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
216   ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
217 }
218 
TEST_F(RRSendQueueTest,DiscardPartialPackets)219 TEST_F(RRSendQueueTest, DiscardPartialPackets) {
220   std::vector<uint8_t> payload(120);
221 
222   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
223   buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload));
224 
225   absl::optional<SendQueue::DataToSend> chunk_one =
226       buf_.Produce(kNow, kOneFragmentPacketSize);
227   ASSERT_TRUE(chunk_one.has_value());
228   EXPECT_FALSE(chunk_one->data.is_end);
229   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
230   buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
231                chunk_one->data.message_id);
232 
233   absl::optional<SendQueue::DataToSend> chunk_two =
234       buf_.Produce(kNow, kOneFragmentPacketSize);
235   ASSERT_TRUE(chunk_two.has_value());
236   EXPECT_FALSE(chunk_two->data.is_end);
237   EXPECT_EQ(chunk_two->data.stream_id, StreamID(2));
238 
239   absl::optional<SendQueue::DataToSend> chunk_three =
240       buf_.Produce(kNow, kOneFragmentPacketSize);
241   ASSERT_TRUE(chunk_three.has_value());
242   EXPECT_TRUE(chunk_three->data.is_end);
243   EXPECT_EQ(chunk_three->data.stream_id, StreamID(2));
244   ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
245 
246   // Calling it again shouldn't cause issues.
247   buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
248                chunk_one->data.message_id);
249   ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
250 }
251 
TEST_F(RRSendQueueTest,PrepareResetStreamsDiscardsStream)252 TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) {
253   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3}));
254   buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5}));
255   EXPECT_EQ(buf_.total_buffered_amount(), 8u);
256 
257   buf_.PrepareResetStream(StreamID(1));
258   EXPECT_EQ(buf_.total_buffered_amount(), 5u);
259 
260   EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
261               UnorderedElementsAre(StreamID(1)));
262   buf_.CommitResetStreams();
263   buf_.PrepareResetStream(StreamID(2));
264   EXPECT_EQ(buf_.total_buffered_amount(), 0u);
265 }
266 
TEST_F(RRSendQueueTest,PrepareResetStreamsNotPartialPackets)267 TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) {
268   std::vector<uint8_t> payload(120);
269 
270   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
271   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
272 
273   absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
274   ASSERT_TRUE(chunk_one.has_value());
275   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
276   EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50);
277 
278   buf_.PrepareResetStream(StreamID(1));
279   EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50);
280 }
281 
TEST_F(RRSendQueueTest,EnqueuedItemsArePausedDuringStreamReset)282 TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
283   std::vector<uint8_t> payload(50);
284 
285   buf_.PrepareResetStream(StreamID(1));
286   EXPECT_EQ(buf_.total_buffered_amount(), 0u);
287 
288   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
289   EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
290 
291   EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
292 
293   EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
294   EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
295               UnorderedElementsAre(StreamID(1)));
296 
297   EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
298 
299   buf_.CommitResetStreams();
300   EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
301 
302   absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
303   ASSERT_TRUE(chunk_one.has_value());
304   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
305   EXPECT_EQ(buf_.total_buffered_amount(), 0u);
306 }
307 
TEST_F(RRSendQueueTest,PausedStreamsStillSendPartialMessagesUntilEnd)308 TEST_F(RRSendQueueTest, PausedStreamsStillSendPartialMessagesUntilEnd) {
309   constexpr size_t kPayloadSize = 100;
310   constexpr size_t kFragmentSize = 50;
311   std::vector<uint8_t> payload(kPayloadSize);
312 
313   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
314   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
315 
316   absl::optional<SendQueue::DataToSend> chunk_one =
317       buf_.Produce(kNow, kFragmentSize);
318   ASSERT_TRUE(chunk_one.has_value());
319   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
320   EXPECT_EQ(buf_.total_buffered_amount(), 2 * kPayloadSize - kFragmentSize);
321 
322   // This will stop the second message from being sent.
323   buf_.PrepareResetStream(StreamID(1));
324   EXPECT_EQ(buf_.total_buffered_amount(), 1 * kPayloadSize - kFragmentSize);
325 
326   // Should still produce fragments until end of message.
327   absl::optional<SendQueue::DataToSend> chunk_two =
328       buf_.Produce(kNow, kFragmentSize);
329   ASSERT_TRUE(chunk_two.has_value());
330   EXPECT_EQ(chunk_two->data.stream_id, kStreamID);
331   EXPECT_EQ(buf_.total_buffered_amount(), 0ul);
332 
333   // But shouldn't produce any more messages as the stream is paused.
334   EXPECT_FALSE(buf_.Produce(kNow, kFragmentSize).has_value());
335 }
336 
TEST_F(RRSendQueueTest,CommittingResetsSSN)337 TEST_F(RRSendQueueTest, CommittingResetsSSN) {
338   std::vector<uint8_t> payload(50);
339 
340   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
341   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
342 
343   absl::optional<SendQueue::DataToSend> chunk_one =
344       buf_.Produce(kNow, kOneFragmentPacketSize);
345   ASSERT_TRUE(chunk_one.has_value());
346   EXPECT_EQ(chunk_one->data.ssn, SSN(0));
347 
348   absl::optional<SendQueue::DataToSend> chunk_two =
349       buf_.Produce(kNow, kOneFragmentPacketSize);
350   ASSERT_TRUE(chunk_two.has_value());
351   EXPECT_EQ(chunk_two->data.ssn, SSN(1));
352 
353   buf_.PrepareResetStream(StreamID(1));
354 
355   // Buffered
356   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
357 
358   EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
359   EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
360               UnorderedElementsAre(StreamID(1)));
361   buf_.CommitResetStreams();
362 
363   absl::optional<SendQueue::DataToSend> chunk_three =
364       buf_.Produce(kNow, kOneFragmentPacketSize);
365   ASSERT_TRUE(chunk_three.has_value());
366   EXPECT_EQ(chunk_three->data.ssn, SSN(0));
367 }
368 
TEST_F(RRSendQueueTest,CommittingResetsSSNForPausedStreamsOnly)369 TEST_F(RRSendQueueTest, CommittingResetsSSNForPausedStreamsOnly) {
370   std::vector<uint8_t> payload(50);
371 
372   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
373   buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));
374 
375   absl::optional<SendQueue::DataToSend> chunk_one =
376       buf_.Produce(kNow, kOneFragmentPacketSize);
377   ASSERT_TRUE(chunk_one.has_value());
378   EXPECT_EQ(chunk_one->data.stream_id, StreamID(1));
379   EXPECT_EQ(chunk_one->data.ssn, SSN(0));
380 
381   absl::optional<SendQueue::DataToSend> chunk_two =
382       buf_.Produce(kNow, kOneFragmentPacketSize);
383   ASSERT_TRUE(chunk_two.has_value());
384   EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
385   EXPECT_EQ(chunk_two->data.ssn, SSN(0));
386 
387   buf_.PrepareResetStream(StreamID(3));
388 
389   // Send two more messages - SID 3 will buffer, SID 1 will send.
390   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
391   buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));
392 
393   EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
394   EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
395               UnorderedElementsAre(StreamID(3)));
396 
397   buf_.CommitResetStreams();
398 
399   absl::optional<SendQueue::DataToSend> chunk_three =
400       buf_.Produce(kNow, kOneFragmentPacketSize);
401   ASSERT_TRUE(chunk_three.has_value());
402   EXPECT_EQ(chunk_three->data.stream_id, StreamID(1));
403   EXPECT_EQ(chunk_three->data.ssn, SSN(1));
404 
405   absl::optional<SendQueue::DataToSend> chunk_four =
406       buf_.Produce(kNow, kOneFragmentPacketSize);
407   ASSERT_TRUE(chunk_four.has_value());
408   EXPECT_EQ(chunk_four->data.stream_id, StreamID(3));
409   EXPECT_EQ(chunk_four->data.ssn, SSN(0));
410 }
411 
TEST_F(RRSendQueueTest,RollBackResumesSSN)412 TEST_F(RRSendQueueTest, RollBackResumesSSN) {
413   std::vector<uint8_t> payload(50);
414 
415   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
416   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
417 
418   absl::optional<SendQueue::DataToSend> chunk_one =
419       buf_.Produce(kNow, kOneFragmentPacketSize);
420   ASSERT_TRUE(chunk_one.has_value());
421   EXPECT_EQ(chunk_one->data.ssn, SSN(0));
422 
423   absl::optional<SendQueue::DataToSend> chunk_two =
424       buf_.Produce(kNow, kOneFragmentPacketSize);
425   ASSERT_TRUE(chunk_two.has_value());
426   EXPECT_EQ(chunk_two->data.ssn, SSN(1));
427 
428   buf_.PrepareResetStream(StreamID(1));
429 
430   // Buffered
431   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
432 
433   EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
434   EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
435               UnorderedElementsAre(StreamID(1)));
436   buf_.RollbackResetStreams();
437 
438   absl::optional<SendQueue::DataToSend> chunk_three =
439       buf_.Produce(kNow, kOneFragmentPacketSize);
440   ASSERT_TRUE(chunk_three.has_value());
441   EXPECT_EQ(chunk_three->data.ssn, SSN(2));
442 }
443 
TEST_F(RRSendQueueTest,ReturnsFragmentsForOneMessageBeforeMovingToNext)444 TEST_F(RRSendQueueTest, ReturnsFragmentsForOneMessageBeforeMovingToNext) {
445   std::vector<uint8_t> payload(200);
446   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
447   buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
448 
449   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
450                               buf_.Produce(kNow, kOneFragmentPacketSize));
451   EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
452 
453   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
454                               buf_.Produce(kNow, kOneFragmentPacketSize));
455   EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
456 
457   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
458                               buf_.Produce(kNow, kOneFragmentPacketSize));
459   EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
460 
461   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
462                               buf_.Produce(kNow, kOneFragmentPacketSize));
463   EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
464 }
465 
TEST_F(RRSendQueueTest,ReturnsAlsoSmallFragmentsBeforeMovingToNext)466 TEST_F(RRSendQueueTest, ReturnsAlsoSmallFragmentsBeforeMovingToNext) {
467   std::vector<uint8_t> payload(kTwoFragmentPacketSize);
468   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
469   buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
470 
471   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
472                               buf_.Produce(kNow, kOneFragmentPacketSize));
473   EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
474   EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
475 
476   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
477                               buf_.Produce(kNow, kOneFragmentPacketSize));
478   EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
479   EXPECT_THAT(chunk2.data.payload,
480               SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
481 
482   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
483                               buf_.Produce(kNow, kOneFragmentPacketSize));
484   EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
485   EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
486 
487   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
488                               buf_.Produce(kNow, kOneFragmentPacketSize));
489   EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
490   EXPECT_THAT(chunk4.data.payload,
491               SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
492 }
493 
TEST_F(RRSendQueueTest,WillCycleInRoundRobinFashionBetweenStreams)494 TEST_F(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) {
495   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
496   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(2)));
497   buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(3)));
498   buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(4)));
499   buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(5)));
500   buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(6)));
501   buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(7)));
502   buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(8)));
503 
504   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
505                               buf_.Produce(kNow, kOneFragmentPacketSize));
506   EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
507   EXPECT_THAT(chunk1.data.payload, SizeIs(1));
508 
509   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
510                               buf_.Produce(kNow, kOneFragmentPacketSize));
511   EXPECT_EQ(chunk2.data.stream_id, StreamID(2));
512   EXPECT_THAT(chunk2.data.payload, SizeIs(3));
513 
514   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
515                               buf_.Produce(kNow, kOneFragmentPacketSize));
516   EXPECT_EQ(chunk3.data.stream_id, StreamID(3));
517   EXPECT_THAT(chunk3.data.payload, SizeIs(5));
518 
519   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
520                               buf_.Produce(kNow, kOneFragmentPacketSize));
521   EXPECT_EQ(chunk4.data.stream_id, StreamID(4));
522   EXPECT_THAT(chunk4.data.payload, SizeIs(7));
523 
524   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk5,
525                               buf_.Produce(kNow, kOneFragmentPacketSize));
526   EXPECT_EQ(chunk5.data.stream_id, StreamID(1));
527   EXPECT_THAT(chunk5.data.payload, SizeIs(2));
528 
529   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk6,
530                               buf_.Produce(kNow, kOneFragmentPacketSize));
531   EXPECT_EQ(chunk6.data.stream_id, StreamID(2));
532   EXPECT_THAT(chunk6.data.payload, SizeIs(4));
533 
534   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk7,
535                               buf_.Produce(kNow, kOneFragmentPacketSize));
536   EXPECT_EQ(chunk7.data.stream_id, StreamID(3));
537   EXPECT_THAT(chunk7.data.payload, SizeIs(6));
538 
539   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk8,
540                               buf_.Produce(kNow, kOneFragmentPacketSize));
541   EXPECT_EQ(chunk8.data.stream_id, StreamID(4));
542   EXPECT_THAT(chunk8.data.payload, SizeIs(8));
543 }
544 
TEST_F(RRSendQueueTest,DoesntTriggerOnBufferedAmountLowWhenSetToZero)545 TEST_F(RRSendQueueTest, DoesntTriggerOnBufferedAmountLowWhenSetToZero) {
546   EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
547   buf_.SetBufferedAmountLowThreshold(StreamID(1), 0u);
548 }
549 
TEST_F(RRSendQueueTest,TriggersOnBufferedAmountAtZeroLowWhenSent)550 TEST_F(RRSendQueueTest, TriggersOnBufferedAmountAtZeroLowWhenSent) {
551   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
552   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u);
553 
554   EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
555 
556   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
557                               buf_.Produce(kNow, kOneFragmentPacketSize));
558   EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
559   EXPECT_THAT(chunk1.data.payload, SizeIs(1));
560   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
561 }
562 
TEST_F(RRSendQueueTest,WillRetriggerOnBufferedAmountLowIfAddingMore)563 TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowIfAddingMore) {
564   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
565 
566   EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
567 
568   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
569                               buf_.Produce(kNow, kOneFragmentPacketSize));
570   EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
571   EXPECT_THAT(chunk1.data.payload, SizeIs(1));
572 
573   EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
574 
575   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
576   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u);
577 
578   // Should now trigger again, as buffer_amount went above the threshold.
579   EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
580   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
581                               buf_.Produce(kNow, kOneFragmentPacketSize));
582   EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
583   EXPECT_THAT(chunk2.data.payload, SizeIs(1));
584 }
585 
TEST_F(RRSendQueueTest,OnlyTriggersWhenTransitioningFromAboveToBelowOrEqual)586 TEST_F(RRSendQueueTest, OnlyTriggersWhenTransitioningFromAboveToBelowOrEqual) {
587   buf_.SetBufferedAmountLowThreshold(StreamID(1), 1000);
588 
589   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(10)));
590   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 10u);
591 
592   EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
593   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
594                               buf_.Produce(kNow, kOneFragmentPacketSize));
595   EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
596   EXPECT_THAT(chunk1.data.payload, SizeIs(10));
597   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
598 
599   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(20)));
600   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 20u);
601 
602   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
603                               buf_.Produce(kNow, kOneFragmentPacketSize));
604   EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
605   EXPECT_THAT(chunk2.data.payload, SizeIs(20));
606   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
607 }
608 
TEST_F(RRSendQueueTest,WillTriggerOnBufferedAmountLowSetAboveZero)609 TEST_F(RRSendQueueTest, WillTriggerOnBufferedAmountLowSetAboveZero) {
610   EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
611 
612   buf_.SetBufferedAmountLowThreshold(StreamID(1), 700);
613 
614   std::vector<uint8_t> payload(1000);
615   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
616 
617   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
618                               buf_.Produce(kNow, kOneFragmentPacketSize));
619   EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
620   EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
621   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 900u);
622 
623   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
624                               buf_.Produce(kNow, kOneFragmentPacketSize));
625   EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
626   EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize));
627   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u);
628 
629   EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
630 
631   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
632                               buf_.Produce(kNow, kOneFragmentPacketSize));
633   EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
634   EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
635   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 700u);
636 
637   // Doesn't trigger when reducing even further.
638   EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
639 
640   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
641                               buf_.Produce(kNow, kOneFragmentPacketSize));
642   EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
643   EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
644   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
645 }
646 
TEST_F(RRSendQueueTest,WillRetriggerOnBufferedAmountLowSetAboveZero)647 TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowSetAboveZero) {
648   EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
649 
650   buf_.SetBufferedAmountLowThreshold(StreamID(1), 700);
651 
652   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1000)));
653 
654   EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
655   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
656                               buf_.Produce(kNow, 400));
657   EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
658   EXPECT_THAT(chunk1.data.payload, SizeIs(400));
659   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
660 
661   EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
662   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(200)));
663   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u);
664 
665   // Will trigger again, as it went above the limit.
666   EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
667   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
668                               buf_.Produce(kNow, 200));
669   EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
670   EXPECT_THAT(chunk2.data.payload, SizeIs(200));
671   EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
672 }
673 
TEST_F(RRSendQueueTest,TriggersOnBufferedAmountLowOnThresholdChanged)674 TEST_F(RRSendQueueTest, TriggersOnBufferedAmountLowOnThresholdChanged) {
675   EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
676 
677   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(100)));
678 
679   // Modifying the threshold, still under buffered_amount, should not trigger.
680   buf_.SetBufferedAmountLowThreshold(StreamID(1), 50);
681   buf_.SetBufferedAmountLowThreshold(StreamID(1), 99);
682 
683   // When the threshold reaches buffered_amount, it will trigger.
684   EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
685   buf_.SetBufferedAmountLowThreshold(StreamID(1), 100);
686 
687   // But not when it's set low again.
688   EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
689   buf_.SetBufferedAmountLowThreshold(StreamID(1), 50);
690 
691   // But it will trigger when it overshoots.
692   EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
693   buf_.SetBufferedAmountLowThreshold(StreamID(1), 150);
694 
695   // But not when it's set low again.
696   EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
697   buf_.SetBufferedAmountLowThreshold(StreamID(1), 0);
698 }
699 
TEST_F(RRSendQueueTest,OnTotalBufferedAmountLowDoesNotTriggerOnBufferFillingUp)700 TEST_F(RRSendQueueTest,
701        OnTotalBufferedAmountLowDoesNotTriggerOnBufferFillingUp) {
702   EXPECT_CALL(callbacks_, OnTotalBufferedAmountLow).Times(0);
703   std::vector<uint8_t> payload(kBufferedAmountLowThreshold - 1);
704   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
705   EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
706 
707   // Will not trigger if going above but never below.
708   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID,
709                                std::vector<uint8_t>(kOneFragmentPacketSize)));
710 }
711 
TEST_F(RRSendQueueTest,TriggersOnTotalBufferedAmountLowWhenCrossing)712 TEST_F(RRSendQueueTest, TriggersOnTotalBufferedAmountLowWhenCrossing) {
713   EXPECT_CALL(callbacks_, OnTotalBufferedAmountLow).Times(0);
714   std::vector<uint8_t> payload(kBufferedAmountLowThreshold);
715   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
716   EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
717 
718   // Reaches it.
719   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, std::vector<uint8_t>(1)));
720 
721   // Drain it a bit - will trigger.
722   EXPECT_CALL(callbacks_, OnTotalBufferedAmountLow).Times(1);
723   absl::optional<SendQueue::DataToSend> chunk_two =
724       buf_.Produce(kNow, kOneFragmentPacketSize);
725 }
726 
TEST_F(RRSendQueueTest,WillStayInAStreamAsLongAsThatMessageIsSending)727 TEST_F(RRSendQueueTest, WillStayInAStreamAsLongAsThatMessageIsSending) {
728   buf_.Add(kNow, DcSctpMessage(StreamID(5), kPPID, std::vector<uint8_t>(1)));
729 
730   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
731                               buf_.Produce(kNow, kOneFragmentPacketSize));
732   EXPECT_EQ(chunk1.data.stream_id, StreamID(5));
733   EXPECT_THAT(chunk1.data.payload, SizeIs(1));
734 
735   // Next, it should pick a different stream.
736 
737   buf_.Add(kNow,
738            DcSctpMessage(StreamID(1), kPPID,
739                          std::vector<uint8_t>(kOneFragmentPacketSize * 2)));
740 
741   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
742                               buf_.Produce(kNow, kOneFragmentPacketSize));
743   EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
744   EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize));
745 
746   // It should still stay on the Stream1 now, even if might be tempted to switch
747   // to this stream, as it's the stream following 5.
748   buf_.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector<uint8_t>(1)));
749 
750   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
751                               buf_.Produce(kNow, kOneFragmentPacketSize));
752   EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
753   EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
754 
755   // After stream id 1 is complete, it's time to do stream 6.
756   ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
757                               buf_.Produce(kNow, kOneFragmentPacketSize));
758   EXPECT_EQ(chunk4.data.stream_id, StreamID(6));
759   EXPECT_THAT(chunk4.data.payload, SizeIs(1));
760 
761   EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
762 }
763 
TEST_F(RRSendQueueTest,StreamsHaveInitialPriority)764 TEST_F(RRSendQueueTest, StreamsHaveInitialPriority) {
765   EXPECT_EQ(buf_.GetStreamPriority(StreamID(1)), kDefaultPriority);
766 
767   buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(40)));
768   EXPECT_EQ(buf_.GetStreamPriority(StreamID(2)), kDefaultPriority);
769 }
770 
TEST_F(RRSendQueueTest,CanChangeStreamPriority)771 TEST_F(RRSendQueueTest, CanChangeStreamPriority) {
772   buf_.SetStreamPriority(StreamID(1), StreamPriority(42));
773   EXPECT_EQ(buf_.GetStreamPriority(StreamID(1)), StreamPriority(42));
774 
775   buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(40)));
776   buf_.SetStreamPriority(StreamID(2), StreamPriority(42));
777   EXPECT_EQ(buf_.GetStreamPriority(StreamID(2)), StreamPriority(42));
778 }
779 
TEST_F(RRSendQueueTest,WillHandoverPriority)780 TEST_F(RRSendQueueTest, WillHandoverPriority) {
781   buf_.SetStreamPriority(StreamID(1), StreamPriority(42));
782 
783   buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(40)));
784   buf_.SetStreamPriority(StreamID(2), StreamPriority(42));
785 
786   DcSctpSocketHandoverState state;
787   buf_.AddHandoverState(state);
788 
789   RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority,
790                  kBufferedAmountLowThreshold);
791   q2.RestoreFromState(state);
792   EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));
793   EXPECT_EQ(q2.GetStreamPriority(StreamID(2)), StreamPriority(42));
794 }
795 
TEST_F(RRSendQueueTest,WillSendMessagesByPrio)796 TEST_F(RRSendQueueTest, WillSendMessagesByPrio) {
797   buf_.EnableMessageInterleaving(true);
798   buf_.SetStreamPriority(StreamID(1), StreamPriority(10));
799   buf_.SetStreamPriority(StreamID(2), StreamPriority(20));
800   buf_.SetStreamPriority(StreamID(3), StreamPriority(30));
801 
802   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(40)));
803   buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(20)));
804   buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(10)));
805   std::vector<uint16_t> expected_streams = {3, 2, 2, 1, 1, 1, 1};
806 
807   for (uint16_t stream_num : expected_streams) {
808     ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk,
809                                 buf_.Produce(kNow, 10));
810     EXPECT_EQ(chunk.data.stream_id, StreamID(stream_num));
811   }
812   EXPECT_FALSE(buf_.Produce(kNow, 1).has_value());
813 }
814 
TEST_F(RRSendQueueTest,WillSendLifecycleExpireWhenExpiredInSendQueue)815 TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenExpiredInSendQueue) {
816   std::vector<uint8_t> payload(kOneFragmentPacketSize);
817   buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload),
818            SendOptions{.lifetime = DurationMs(1000),
819                        .lifecycle_id = LifecycleId(1)});
820 
821   EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1),
822                                                     /*maybe_delivered=*/false));
823   EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
824   EXPECT_FALSE(buf_.Produce(kNow + DurationMs(1001), kOneFragmentPacketSize)
825                    .has_value());
826 }
827 
TEST_F(RRSendQueueTest,WillSendLifecycleExpireWhenDiscardingDuringPause)828 TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingDuringPause) {
829   std::vector<uint8_t> payload(120);
830 
831   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
832            SendOptions{.lifecycle_id = LifecycleId(1)});
833   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
834            SendOptions{.lifecycle_id = LifecycleId(2)});
835 
836   absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
837   ASSERT_TRUE(chunk_one.has_value());
838   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
839   EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50);
840 
841   EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(2),
842                                                     /*maybe_delivered=*/false));
843   EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(2)));
844   buf_.PrepareResetStream(StreamID(1));
845   EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50);
846 }
847 
TEST_F(RRSendQueueTest,WillSendLifecycleExpireWhenDiscardingExplicitly)848 TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingExplicitly) {
849   std::vector<uint8_t> payload(kOneFragmentPacketSize + 20);
850 
851   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
852            SendOptions{.lifecycle_id = LifecycleId(1)});
853 
854   absl::optional<SendQueue::DataToSend> chunk_one =
855       buf_.Produce(kNow, kOneFragmentPacketSize);
856   ASSERT_TRUE(chunk_one.has_value());
857   EXPECT_FALSE(chunk_one->data.is_end);
858   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
859   EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1),
860                                                     /*maybe_delivered=*/false));
861   EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
862   buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
863                chunk_one->data.message_id);
864 }
865 }  // namespace
866 }  // namespace dcsctp
867