xref: /aosp_15_r20/external/webrtc/net/dcsctp/rx/reassembly_queue.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1*d9f75844SAndroid Build Coastguard Worker /*
2*d9f75844SAndroid Build Coastguard Worker  *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3*d9f75844SAndroid Build Coastguard Worker  *
4*d9f75844SAndroid Build Coastguard Worker  *  Use of this source code is governed by a BSD-style license
5*d9f75844SAndroid Build Coastguard Worker  *  that can be found in the LICENSE file in the root of the source
6*d9f75844SAndroid Build Coastguard Worker  *  tree. An additional intellectual property rights grant can be found
7*d9f75844SAndroid Build Coastguard Worker  *  in the file PATENTS.  All contributing project authors may
8*d9f75844SAndroid Build Coastguard Worker  *  be found in the AUTHORS file in the root of the source tree.
9*d9f75844SAndroid Build Coastguard Worker  */
10*d9f75844SAndroid Build Coastguard Worker #include "net/dcsctp/rx/reassembly_queue.h"
11*d9f75844SAndroid Build Coastguard Worker 
12*d9f75844SAndroid Build Coastguard Worker #include <stddef.h>
13*d9f75844SAndroid Build Coastguard Worker 
14*d9f75844SAndroid Build Coastguard Worker #include <algorithm>
15*d9f75844SAndroid Build Coastguard Worker #include <cstdint>
16*d9f75844SAndroid Build Coastguard Worker #include <memory>
17*d9f75844SAndroid Build Coastguard Worker #include <set>
18*d9f75844SAndroid Build Coastguard Worker #include <string>
19*d9f75844SAndroid Build Coastguard Worker #include <utility>
20*d9f75844SAndroid Build Coastguard Worker #include <vector>
21*d9f75844SAndroid Build Coastguard Worker 
22*d9f75844SAndroid Build Coastguard Worker #include "absl/strings/string_view.h"
23*d9f75844SAndroid Build Coastguard Worker #include "absl/types/optional.h"
24*d9f75844SAndroid Build Coastguard Worker #include "api/array_view.h"
25*d9f75844SAndroid Build Coastguard Worker #include "net/dcsctp/common/sequence_numbers.h"
26*d9f75844SAndroid Build Coastguard Worker #include "net/dcsctp/common/str_join.h"
27*d9f75844SAndroid Build Coastguard Worker #include "net/dcsctp/packet/chunk/forward_tsn_common.h"
28*d9f75844SAndroid Build Coastguard Worker #include "net/dcsctp/packet/data.h"
29*d9f75844SAndroid Build Coastguard Worker #include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
30*d9f75844SAndroid Build Coastguard Worker #include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
31*d9f75844SAndroid Build Coastguard Worker #include "net/dcsctp/public/dcsctp_message.h"
32*d9f75844SAndroid Build Coastguard Worker #include "net/dcsctp/rx/interleaved_reassembly_streams.h"
33*d9f75844SAndroid Build Coastguard Worker #include "net/dcsctp/rx/reassembly_streams.h"
34*d9f75844SAndroid Build Coastguard Worker #include "net/dcsctp/rx/traditional_reassembly_streams.h"
35*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/logging.h"
36*d9f75844SAndroid Build Coastguard Worker 
37*d9f75844SAndroid Build Coastguard Worker namespace dcsctp {
38*d9f75844SAndroid Build Coastguard Worker namespace {
CreateStreams(absl::string_view log_prefix,ReassemblyStreams::OnAssembledMessage on_assembled_message,bool use_message_interleaving)39*d9f75844SAndroid Build Coastguard Worker std::unique_ptr<ReassemblyStreams> CreateStreams(
40*d9f75844SAndroid Build Coastguard Worker     absl::string_view log_prefix,
41*d9f75844SAndroid Build Coastguard Worker     ReassemblyStreams::OnAssembledMessage on_assembled_message,
42*d9f75844SAndroid Build Coastguard Worker     bool use_message_interleaving) {
43*d9f75844SAndroid Build Coastguard Worker   if (use_message_interleaving) {
44*d9f75844SAndroid Build Coastguard Worker     return std::make_unique<InterleavedReassemblyStreams>(
45*d9f75844SAndroid Build Coastguard Worker         log_prefix, std::move(on_assembled_message));
46*d9f75844SAndroid Build Coastguard Worker   }
47*d9f75844SAndroid Build Coastguard Worker   return std::make_unique<TraditionalReassemblyStreams>(
48*d9f75844SAndroid Build Coastguard Worker       log_prefix, std::move(on_assembled_message));
49*d9f75844SAndroid Build Coastguard Worker }
50*d9f75844SAndroid Build Coastguard Worker }  // namespace
51*d9f75844SAndroid Build Coastguard Worker 
ReassemblyQueue(absl::string_view log_prefix,TSN peer_initial_tsn,size_t max_size_bytes,bool use_message_interleaving)52*d9f75844SAndroid Build Coastguard Worker ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
53*d9f75844SAndroid Build Coastguard Worker                                  TSN peer_initial_tsn,
54*d9f75844SAndroid Build Coastguard Worker                                  size_t max_size_bytes,
55*d9f75844SAndroid Build Coastguard Worker                                  bool use_message_interleaving)
56*d9f75844SAndroid Build Coastguard Worker     : log_prefix_(std::string(log_prefix) + "reasm: "),
57*d9f75844SAndroid Build Coastguard Worker       max_size_bytes_(max_size_bytes),
58*d9f75844SAndroid Build Coastguard Worker       watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
59*d9f75844SAndroid Build Coastguard Worker       last_assembled_tsn_watermark_(
60*d9f75844SAndroid Build Coastguard Worker           tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))),
61*d9f75844SAndroid Build Coastguard Worker       last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)),
62*d9f75844SAndroid Build Coastguard Worker       streams_(CreateStreams(
63*d9f75844SAndroid Build Coastguard Worker           log_prefix_,
64*d9f75844SAndroid Build Coastguard Worker           [this](rtc::ArrayView<const UnwrappedTSN> tsns,
65*d9f75844SAndroid Build Coastguard Worker                  DcSctpMessage message) {
66*d9f75844SAndroid Build Coastguard Worker             AddReassembledMessage(tsns, std::move(message));
67*d9f75844SAndroid Build Coastguard Worker           },
68*d9f75844SAndroid Build Coastguard Worker           use_message_interleaving)) {}
69*d9f75844SAndroid Build Coastguard Worker 
Add(TSN tsn,Data data)70*d9f75844SAndroid Build Coastguard Worker void ReassemblyQueue::Add(TSN tsn, Data data) {
71*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(IsConsistent());
72*d9f75844SAndroid Build Coastguard Worker   RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
73*d9f75844SAndroid Build Coastguard Worker                        << ", stream=" << *data.stream_id << ":"
74*d9f75844SAndroid Build Coastguard Worker                        << *data.message_id << ":" << *data.fsn << ", type="
75*d9f75844SAndroid Build Coastguard Worker                        << (data.is_beginning && data.is_end ? "complete"
76*d9f75844SAndroid Build Coastguard Worker                            : data.is_beginning              ? "first"
77*d9f75844SAndroid Build Coastguard Worker                            : data.is_end                    ? "last"
78*d9f75844SAndroid Build Coastguard Worker                                                             : "middle");
79*d9f75844SAndroid Build Coastguard Worker 
80*d9f75844SAndroid Build Coastguard Worker   UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn);
81*d9f75844SAndroid Build Coastguard Worker 
82*d9f75844SAndroid Build Coastguard Worker   if (unwrapped_tsn <= last_assembled_tsn_watermark_ ||
83*d9f75844SAndroid Build Coastguard Worker       delivered_tsns_.find(unwrapped_tsn) != delivered_tsns_.end()) {
84*d9f75844SAndroid Build Coastguard Worker     RTC_DLOG(LS_VERBOSE) << log_prefix_
85*d9f75844SAndroid Build Coastguard Worker                          << "Chunk has already been delivered - skipping";
86*d9f75844SAndroid Build Coastguard Worker     return;
87*d9f75844SAndroid Build Coastguard Worker   }
88*d9f75844SAndroid Build Coastguard Worker 
89*d9f75844SAndroid Build Coastguard Worker   // If a stream reset has been received with a "sender's last assigned tsn" in
90*d9f75844SAndroid Build Coastguard Worker   // the future, the socket is in "deferred reset processing" mode and must
91*d9f75844SAndroid Build Coastguard Worker   // buffer chunks until it's exited.
92*d9f75844SAndroid Build Coastguard Worker   if (deferred_reset_streams_.has_value() &&
93*d9f75844SAndroid Build Coastguard Worker       unwrapped_tsn >
94*d9f75844SAndroid Build Coastguard Worker           tsn_unwrapper_.Unwrap(
95*d9f75844SAndroid Build Coastguard Worker               deferred_reset_streams_->req.sender_last_assigned_tsn())) {
96*d9f75844SAndroid Build Coastguard Worker     RTC_DLOG(LS_VERBOSE)
97*d9f75844SAndroid Build Coastguard Worker         << log_prefix_ << "Deferring chunk with tsn=" << *tsn
98*d9f75844SAndroid Build Coastguard Worker         << " until cum_ack_tsn="
99*d9f75844SAndroid Build Coastguard Worker         << *deferred_reset_streams_->req.sender_last_assigned_tsn();
100*d9f75844SAndroid Build Coastguard Worker     // https://tools.ietf.org/html/rfc6525#section-5.2.2
101*d9f75844SAndroid Build Coastguard Worker     // "In this mode, any data arriving with a TSN larger than the
102*d9f75844SAndroid Build Coastguard Worker     // Sender's Last Assigned TSN for the affected stream(s) MUST be queued
103*d9f75844SAndroid Build Coastguard Worker     // locally and held until the cumulative acknowledgment point reaches the
104*d9f75844SAndroid Build Coastguard Worker     // Sender's Last Assigned TSN."
105*d9f75844SAndroid Build Coastguard Worker     queued_bytes_ += data.size();
106*d9f75844SAndroid Build Coastguard Worker     deferred_reset_streams_->deferred_chunks.emplace_back(
107*d9f75844SAndroid Build Coastguard Worker         std::make_pair(tsn, std::move(data)));
108*d9f75844SAndroid Build Coastguard Worker   } else {
109*d9f75844SAndroid Build Coastguard Worker     queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data));
110*d9f75844SAndroid Build Coastguard Worker   }
111*d9f75844SAndroid Build Coastguard Worker 
112*d9f75844SAndroid Build Coastguard Worker   // https://tools.ietf.org/html/rfc4960#section-6.9
113*d9f75844SAndroid Build Coastguard Worker   // "Note: If the data receiver runs out of buffer space while still
114*d9f75844SAndroid Build Coastguard Worker   // waiting for more fragments to complete the reassembly of the message, it
115*d9f75844SAndroid Build Coastguard Worker   // should dispatch part of its inbound message through a partial delivery
116*d9f75844SAndroid Build Coastguard Worker   // API (see Section 10), freeing some of its receive buffer space so that
117*d9f75844SAndroid Build Coastguard Worker   // the rest of the message may be received."
118*d9f75844SAndroid Build Coastguard Worker 
119*d9f75844SAndroid Build Coastguard Worker   // TODO(boivie): Support EOR flag and partial delivery?
120*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(IsConsistent());
121*d9f75844SAndroid Build Coastguard Worker }
122*d9f75844SAndroid Build Coastguard Worker 
ResetStreams(const OutgoingSSNResetRequestParameter & req,TSN cum_tsn_ack)123*d9f75844SAndroid Build Coastguard Worker ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams(
124*d9f75844SAndroid Build Coastguard Worker     const OutgoingSSNResetRequestParameter& req,
125*d9f75844SAndroid Build Coastguard Worker     TSN cum_tsn_ack) {
126*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(IsConsistent());
127*d9f75844SAndroid Build Coastguard Worker   if (deferred_reset_streams_.has_value()) {
128*d9f75844SAndroid Build Coastguard Worker     // In deferred mode already.
129*d9f75844SAndroid Build Coastguard Worker     return ReconfigurationResponseParameter::Result::kInProgress;
130*d9f75844SAndroid Build Coastguard Worker   } else if (req.request_sequence_number() <=
131*d9f75844SAndroid Build Coastguard Worker              last_completed_reset_req_seq_nbr_) {
132*d9f75844SAndroid Build Coastguard Worker     // Already performed at some time previously.
133*d9f75844SAndroid Build Coastguard Worker     return ReconfigurationResponseParameter::Result::kSuccessPerformed;
134*d9f75844SAndroid Build Coastguard Worker   }
135*d9f75844SAndroid Build Coastguard Worker 
136*d9f75844SAndroid Build Coastguard Worker   UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn());
137*d9f75844SAndroid Build Coastguard Worker   UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack);
138*d9f75844SAndroid Build Coastguard Worker 
139*d9f75844SAndroid Build Coastguard Worker   // https://tools.ietf.org/html/rfc6525#section-5.2.2
140*d9f75844SAndroid Build Coastguard Worker   // "If the Sender's Last Assigned TSN is greater than the
141*d9f75844SAndroid Build Coastguard Worker   // cumulative acknowledgment point, then the endpoint MUST enter "deferred
142*d9f75844SAndroid Build Coastguard Worker   // reset processing"."
143*d9f75844SAndroid Build Coastguard Worker   if (sla_tsn > unwrapped_cum_tsn_ack) {
144*d9f75844SAndroid Build Coastguard Worker     RTC_DLOG(LS_VERBOSE)
145*d9f75844SAndroid Build Coastguard Worker         << log_prefix_
146*d9f75844SAndroid Build Coastguard Worker         << "Entering deferred reset processing mode until cum_tsn_ack="
147*d9f75844SAndroid Build Coastguard Worker         << *req.sender_last_assigned_tsn();
148*d9f75844SAndroid Build Coastguard Worker     deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(req);
149*d9f75844SAndroid Build Coastguard Worker     return ReconfigurationResponseParameter::Result::kInProgress;
150*d9f75844SAndroid Build Coastguard Worker   }
151*d9f75844SAndroid Build Coastguard Worker 
152*d9f75844SAndroid Build Coastguard Worker   // https://tools.ietf.org/html/rfc6525#section-5.2.2
153*d9f75844SAndroid Build Coastguard Worker   // "... streams MUST be reset to 0 as the next expected SSN."
154*d9f75844SAndroid Build Coastguard Worker   streams_->ResetStreams(req.stream_ids());
155*d9f75844SAndroid Build Coastguard Worker   last_completed_reset_req_seq_nbr_ = req.request_sequence_number();
156*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(IsConsistent());
157*d9f75844SAndroid Build Coastguard Worker   return ReconfigurationResponseParameter::Result::kSuccessPerformed;
158*d9f75844SAndroid Build Coastguard Worker }
159*d9f75844SAndroid Build Coastguard Worker 
MaybeResetStreamsDeferred(TSN cum_ack_tsn)160*d9f75844SAndroid Build Coastguard Worker bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) {
161*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(IsConsistent());
162*d9f75844SAndroid Build Coastguard Worker   if (deferred_reset_streams_.has_value()) {
163*d9f75844SAndroid Build Coastguard Worker     UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn);
164*d9f75844SAndroid Build Coastguard Worker     UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap(
165*d9f75844SAndroid Build Coastguard Worker         deferred_reset_streams_->req.sender_last_assigned_tsn());
166*d9f75844SAndroid Build Coastguard Worker     if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) {
167*d9f75844SAndroid Build Coastguard Worker       RTC_DLOG(LS_VERBOSE) << log_prefix_
168*d9f75844SAndroid Build Coastguard Worker                            << "Leaving deferred reset processing with tsn="
169*d9f75844SAndroid Build Coastguard Worker                            << *cum_ack_tsn << ", feeding back "
170*d9f75844SAndroid Build Coastguard Worker                            << deferred_reset_streams_->deferred_chunks.size()
171*d9f75844SAndroid Build Coastguard Worker                            << " chunks";
172*d9f75844SAndroid Build Coastguard Worker       // https://tools.ietf.org/html/rfc6525#section-5.2.2
173*d9f75844SAndroid Build Coastguard Worker       // "... streams MUST be reset to 0 as the next expected SSN."
174*d9f75844SAndroid Build Coastguard Worker       streams_->ResetStreams(deferred_reset_streams_->req.stream_ids());
175*d9f75844SAndroid Build Coastguard Worker       std::vector<std::pair<TSN, Data>> deferred_chunks =
176*d9f75844SAndroid Build Coastguard Worker           std::move(deferred_reset_streams_->deferred_chunks);
177*d9f75844SAndroid Build Coastguard Worker       // The response will not be sent now, but as a reply to the retried
178*d9f75844SAndroid Build Coastguard Worker       // request, which will come as "in progress" has been sent prior.
179*d9f75844SAndroid Build Coastguard Worker       last_completed_reset_req_seq_nbr_ =
180*d9f75844SAndroid Build Coastguard Worker           deferred_reset_streams_->req.request_sequence_number();
181*d9f75844SAndroid Build Coastguard Worker       deferred_reset_streams_ = absl::nullopt;
182*d9f75844SAndroid Build Coastguard Worker 
183*d9f75844SAndroid Build Coastguard Worker       // https://tools.ietf.org/html/rfc6525#section-5.2.2
184*d9f75844SAndroid Build Coastguard Worker       // "Any queued TSNs (queued at step E2) MUST now be released and processed
185*d9f75844SAndroid Build Coastguard Worker       // normally."
186*d9f75844SAndroid Build Coastguard Worker       for (auto& [tsn, data] : deferred_chunks) {
187*d9f75844SAndroid Build Coastguard Worker         queued_bytes_ -= data.size();
188*d9f75844SAndroid Build Coastguard Worker         Add(tsn, std::move(data));
189*d9f75844SAndroid Build Coastguard Worker       }
190*d9f75844SAndroid Build Coastguard Worker 
191*d9f75844SAndroid Build Coastguard Worker       RTC_DCHECK(IsConsistent());
192*d9f75844SAndroid Build Coastguard Worker       return true;
193*d9f75844SAndroid Build Coastguard Worker     } else {
194*d9f75844SAndroid Build Coastguard Worker       RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn="
195*d9f75844SAndroid Build Coastguard Worker                            << *cum_ack_tsn;
196*d9f75844SAndroid Build Coastguard Worker     }
197*d9f75844SAndroid Build Coastguard Worker   }
198*d9f75844SAndroid Build Coastguard Worker 
199*d9f75844SAndroid Build Coastguard Worker   return false;
200*d9f75844SAndroid Build Coastguard Worker }
201*d9f75844SAndroid Build Coastguard Worker 
FlushMessages()202*d9f75844SAndroid Build Coastguard Worker std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() {
203*d9f75844SAndroid Build Coastguard Worker   std::vector<DcSctpMessage> ret;
204*d9f75844SAndroid Build Coastguard Worker   reassembled_messages_.swap(ret);
205*d9f75844SAndroid Build Coastguard Worker   return ret;
206*d9f75844SAndroid Build Coastguard Worker }
207*d9f75844SAndroid Build Coastguard Worker 
AddReassembledMessage(rtc::ArrayView<const UnwrappedTSN> tsns,DcSctpMessage message)208*d9f75844SAndroid Build Coastguard Worker void ReassemblyQueue::AddReassembledMessage(
209*d9f75844SAndroid Build Coastguard Worker     rtc::ArrayView<const UnwrappedTSN> tsns,
210*d9f75844SAndroid Build Coastguard Worker     DcSctpMessage message) {
211*d9f75844SAndroid Build Coastguard Worker   RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=["
212*d9f75844SAndroid Build Coastguard Worker                        << StrJoin(tsns, ",",
213*d9f75844SAndroid Build Coastguard Worker                                   [](rtc::StringBuilder& sb, UnwrappedTSN tsn) {
214*d9f75844SAndroid Build Coastguard Worker                                     sb << *tsn.Wrap();
215*d9f75844SAndroid Build Coastguard Worker                                   })
216*d9f75844SAndroid Build Coastguard Worker                        << "], message; stream_id=" << *message.stream_id()
217*d9f75844SAndroid Build Coastguard Worker                        << ", ppid=" << *message.ppid()
218*d9f75844SAndroid Build Coastguard Worker                        << ", payload=" << message.payload().size() << " bytes";
219*d9f75844SAndroid Build Coastguard Worker 
220*d9f75844SAndroid Build Coastguard Worker   for (const UnwrappedTSN tsn : tsns) {
221*d9f75844SAndroid Build Coastguard Worker     if (tsn <= last_assembled_tsn_watermark_) {
222*d9f75844SAndroid Build Coastguard Worker       // This can be provoked by a misbehaving peer by sending FORWARD-TSN with
223*d9f75844SAndroid Build Coastguard Worker       // invalid SSNs, allowing ordered messages to stay in the queue that
224*d9f75844SAndroid Build Coastguard Worker       // should've been discarded.
225*d9f75844SAndroid Build Coastguard Worker       RTC_DLOG(LS_VERBOSE)
226*d9f75844SAndroid Build Coastguard Worker           << log_prefix_
227*d9f75844SAndroid Build Coastguard Worker           << "Message is built from fragments already seen - skipping";
228*d9f75844SAndroid Build Coastguard Worker       return;
229*d9f75844SAndroid Build Coastguard Worker     } else if (tsn == last_assembled_tsn_watermark_.next_value()) {
230*d9f75844SAndroid Build Coastguard Worker       // Update watermark, or insert into delivered_tsns_
231*d9f75844SAndroid Build Coastguard Worker       last_assembled_tsn_watermark_.Increment();
232*d9f75844SAndroid Build Coastguard Worker     } else {
233*d9f75844SAndroid Build Coastguard Worker       delivered_tsns_.insert(tsn);
234*d9f75844SAndroid Build Coastguard Worker     }
235*d9f75844SAndroid Build Coastguard Worker   }
236*d9f75844SAndroid Build Coastguard Worker 
237*d9f75844SAndroid Build Coastguard Worker   // With new TSNs in delivered_tsns, gaps might be filled.
238*d9f75844SAndroid Build Coastguard Worker   MaybeMoveLastAssembledWatermarkFurther();
239*d9f75844SAndroid Build Coastguard Worker 
240*d9f75844SAndroid Build Coastguard Worker   reassembled_messages_.emplace_back(std::move(message));
241*d9f75844SAndroid Build Coastguard Worker }
242*d9f75844SAndroid Build Coastguard Worker 
MaybeMoveLastAssembledWatermarkFurther()243*d9f75844SAndroid Build Coastguard Worker void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() {
244*d9f75844SAndroid Build Coastguard Worker   // `delivered_tsns_` contain TSNS when there is a gap between ranges of
245*d9f75844SAndroid Build Coastguard Worker   // assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to
246*d9f75844SAndroid Build Coastguard Worker   // that list, because if so, it can be moved.
247*d9f75844SAndroid Build Coastguard Worker   while (!delivered_tsns_.empty() &&
248*d9f75844SAndroid Build Coastguard Worker          *delivered_tsns_.begin() ==
249*d9f75844SAndroid Build Coastguard Worker              last_assembled_tsn_watermark_.next_value()) {
250*d9f75844SAndroid Build Coastguard Worker     last_assembled_tsn_watermark_.Increment();
251*d9f75844SAndroid Build Coastguard Worker     delivered_tsns_.erase(delivered_tsns_.begin());
252*d9f75844SAndroid Build Coastguard Worker   }
253*d9f75844SAndroid Build Coastguard Worker }
254*d9f75844SAndroid Build Coastguard Worker 
Handle(const AnyForwardTsnChunk & forward_tsn)255*d9f75844SAndroid Build Coastguard Worker void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) {
256*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(IsConsistent());
257*d9f75844SAndroid Build Coastguard Worker   UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn());
258*d9f75844SAndroid Build Coastguard Worker 
259*d9f75844SAndroid Build Coastguard Worker   last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
260*d9f75844SAndroid Build Coastguard Worker   delivered_tsns_.erase(delivered_tsns_.begin(),
261*d9f75844SAndroid Build Coastguard Worker                         delivered_tsns_.upper_bound(tsn));
262*d9f75844SAndroid Build Coastguard Worker 
263*d9f75844SAndroid Build Coastguard Worker   MaybeMoveLastAssembledWatermarkFurther();
264*d9f75844SAndroid Build Coastguard Worker 
265*d9f75844SAndroid Build Coastguard Worker   queued_bytes_ -=
266*d9f75844SAndroid Build Coastguard Worker       streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams());
267*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(IsConsistent());
268*d9f75844SAndroid Build Coastguard Worker }
269*d9f75844SAndroid Build Coastguard Worker 
IsConsistent() const270*d9f75844SAndroid Build Coastguard Worker bool ReassemblyQueue::IsConsistent() const {
271*d9f75844SAndroid Build Coastguard Worker   // `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be
272*d9f75844SAndroid Build Coastguard Worker   // adjacent.
273*d9f75844SAndroid Build Coastguard Worker   if (!delivered_tsns_.empty() &&
274*d9f75844SAndroid Build Coastguard Worker       last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) {
275*d9f75844SAndroid Build Coastguard Worker     return false;
276*d9f75844SAndroid Build Coastguard Worker   }
277*d9f75844SAndroid Build Coastguard Worker 
278*d9f75844SAndroid Build Coastguard Worker   // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively
279*d9f75844SAndroid Build Coastguard Worker   // enforced in this class. This comparison will still trigger if queued_bytes_
280*d9f75844SAndroid Build Coastguard Worker   // became "negative".
281*d9f75844SAndroid Build Coastguard Worker   return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_);
282*d9f75844SAndroid Build Coastguard Worker }
283*d9f75844SAndroid Build Coastguard Worker 
GetHandoverReadiness() const284*d9f75844SAndroid Build Coastguard Worker HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
285*d9f75844SAndroid Build Coastguard Worker   HandoverReadinessStatus status = streams_->GetHandoverReadiness();
286*d9f75844SAndroid Build Coastguard Worker   if (!delivered_tsns_.empty()) {
287*d9f75844SAndroid Build Coastguard Worker     status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap);
288*d9f75844SAndroid Build Coastguard Worker   }
289*d9f75844SAndroid Build Coastguard Worker   if (deferred_reset_streams_.has_value()) {
290*d9f75844SAndroid Build Coastguard Worker     status.Add(HandoverUnreadinessReason::kStreamResetDeferred);
291*d9f75844SAndroid Build Coastguard Worker   }
292*d9f75844SAndroid Build Coastguard Worker   return status;
293*d9f75844SAndroid Build Coastguard Worker }
294*d9f75844SAndroid Build Coastguard Worker 
AddHandoverState(DcSctpSocketHandoverState & state)295*d9f75844SAndroid Build Coastguard Worker void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
296*d9f75844SAndroid Build Coastguard Worker   state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value();
297*d9f75844SAndroid Build Coastguard Worker   state.rx.last_completed_deferred_reset_req_sn =
298*d9f75844SAndroid Build Coastguard Worker       last_completed_reset_req_seq_nbr_.value();
299*d9f75844SAndroid Build Coastguard Worker   streams_->AddHandoverState(state);
300*d9f75844SAndroid Build Coastguard Worker }
301*d9f75844SAndroid Build Coastguard Worker 
RestoreFromState(const DcSctpSocketHandoverState & state)302*d9f75844SAndroid Build Coastguard Worker void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
303*d9f75844SAndroid Build Coastguard Worker   // Validate that the component is in pristine state.
304*d9f75844SAndroid Build Coastguard Worker   RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0));
305*d9f75844SAndroid Build Coastguard Worker 
306*d9f75844SAndroid Build Coastguard Worker   last_assembled_tsn_watermark_ =
307*d9f75844SAndroid Build Coastguard Worker       tsn_unwrapper_.Unwrap(TSN(state.rx.last_assembled_tsn));
308*d9f75844SAndroid Build Coastguard Worker   last_completed_reset_req_seq_nbr_ =
309*d9f75844SAndroid Build Coastguard Worker       ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn);
310*d9f75844SAndroid Build Coastguard Worker   streams_->RestoreFromState(state);
311*d9f75844SAndroid Build Coastguard Worker }
312*d9f75844SAndroid Build Coastguard Worker }  // namespace dcsctp
313