xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/core/quic_stream_sequencer.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/quic_stream_sequencer.h"
6 
7 #include <algorithm>
8 #include <cstddef>
9 #include <limits>
10 #include <string>
11 #include <utility>
12 
13 #include "absl/strings/str_cat.h"
14 #include "absl/strings/string_view.h"
15 #include "quiche/quic/core/quic_clock.h"
16 #include "quiche/quic/core/quic_error_codes.h"
17 #include "quiche/quic/core/quic_packets.h"
18 #include "quiche/quic/core/quic_stream.h"
19 #include "quiche/quic/core/quic_stream_sequencer_buffer.h"
20 #include "quiche/quic/core/quic_types.h"
21 #include "quiche/quic/core/quic_utils.h"
22 #include "quiche/quic/platform/api/quic_bug_tracker.h"
23 #include "quiche/quic/platform/api/quic_flag_utils.h"
24 #include "quiche/quic/platform/api/quic_flags.h"
25 #include "quiche/quic/platform/api/quic_logging.h"
26 #include "quiche/quic/platform/api/quic_stack_trace.h"
27 
28 namespace quic {
29 
QuicStreamSequencer(StreamInterface * quic_stream)30 QuicStreamSequencer::QuicStreamSequencer(StreamInterface* quic_stream)
31     : stream_(quic_stream),
32       buffered_frames_(kStreamReceiveWindowLimit),
33       highest_offset_(0),
34       close_offset_(std::numeric_limits<QuicStreamOffset>::max()),
35       blocked_(false),
36       num_frames_received_(0),
37       num_duplicate_frames_received_(0),
38       ignore_read_data_(false),
39       level_triggered_(false) {}
40 
~QuicStreamSequencer()41 QuicStreamSequencer::~QuicStreamSequencer() {
42   if (stream_ == nullptr) {
43     QUIC_BUG(quic_bug_10858_1) << "Double free'ing QuicStreamSequencer at "
44                                << this << ". " << QuicStackTrace();
45   }
46   stream_ = nullptr;
47 }
48 
OnStreamFrame(const QuicStreamFrame & frame)49 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
50   QUICHE_DCHECK_LE(frame.offset + frame.data_length, close_offset_);
51   ++num_frames_received_;
52   const QuicStreamOffset byte_offset = frame.offset;
53   const size_t data_len = frame.data_length;
54 
55   if (frame.fin &&
56       (!CloseStreamAtOffset(frame.offset + data_len) || data_len == 0)) {
57     return;
58   }
59   if (stream_->version().HasIetfQuicFrames() && data_len == 0) {
60     QUICHE_DCHECK(!frame.fin);
61     // Ignore empty frame with no fin.
62     return;
63   }
64   OnFrameData(byte_offset, data_len, frame.data_buffer);
65 }
66 
OnCryptoFrame(const QuicCryptoFrame & frame)67 void QuicStreamSequencer::OnCryptoFrame(const QuicCryptoFrame& frame) {
68   ++num_frames_received_;
69   if (frame.data_length == 0) {
70     // Ignore empty crypto frame.
71     return;
72   }
73   OnFrameData(frame.offset, frame.data_length, frame.data_buffer);
74 }
75 
OnFrameData(QuicStreamOffset byte_offset,size_t data_len,const char * data_buffer)76 void QuicStreamSequencer::OnFrameData(QuicStreamOffset byte_offset,
77                                       size_t data_len,
78                                       const char* data_buffer) {
79   highest_offset_ = std::max(highest_offset_, byte_offset + data_len);
80   const size_t previous_readable_bytes = buffered_frames_.ReadableBytes();
81   size_t bytes_written;
82   std::string error_details;
83   QuicErrorCode result = buffered_frames_.OnStreamData(
84       byte_offset, absl::string_view(data_buffer, data_len), &bytes_written,
85       &error_details);
86   if (result != QUIC_NO_ERROR) {
87     std::string details =
88         absl::StrCat("Stream ", stream_->id(), ": ",
89                      QuicErrorCodeToString(result), ": ", error_details);
90     QUIC_LOG_FIRST_N(WARNING, 50) << QuicErrorCodeToString(result);
91     QUIC_LOG_FIRST_N(WARNING, 50) << details;
92     stream_->OnUnrecoverableError(result, details);
93     return;
94   }
95 
96   if (bytes_written == 0) {
97     ++num_duplicate_frames_received_;
98     // Silently ignore duplicates.
99     return;
100   }
101 
102   if (blocked_) {
103     return;
104   }
105 
106   if (level_triggered_) {
107     if (buffered_frames_.ReadableBytes() > previous_readable_bytes) {
108       // Readable bytes has changed, let stream decide if to inform application
109       // or not.
110       if (ignore_read_data_) {
111         FlushBufferedFrames();
112       } else {
113         stream_->OnDataAvailable();
114       }
115     }
116     return;
117   }
118   const bool stream_unblocked =
119       previous_readable_bytes == 0 && buffered_frames_.ReadableBytes() > 0;
120   if (stream_unblocked) {
121     if (ignore_read_data_) {
122       FlushBufferedFrames();
123     } else {
124       stream_->OnDataAvailable();
125     }
126   }
127 }
128 
CloseStreamAtOffset(QuicStreamOffset offset)129 bool QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
130   const QuicStreamOffset kMaxOffset =
131       std::numeric_limits<QuicStreamOffset>::max();
132 
133   // If there is a scheduled close, the new offset should match it.
134   if (close_offset_ != kMaxOffset && offset != close_offset_) {
135     stream_->OnUnrecoverableError(
136         QUIC_STREAM_SEQUENCER_INVALID_STATE,
137         absl::StrCat(
138             "Stream ", stream_->id(), " received new final offset: ", offset,
139             ", which is different from close offset: ", close_offset_));
140     return false;
141   }
142 
143   // The final offset should be no less than the highest offset that is
144   // received.
145   if (offset < highest_offset_) {
146     stream_->OnUnrecoverableError(
147         QUIC_STREAM_SEQUENCER_INVALID_STATE,
148         absl::StrCat(
149             "Stream ", stream_->id(), " received fin with offset: ", offset,
150             ", which reduces current highest offset: ", highest_offset_));
151     return false;
152   }
153 
154   close_offset_ = offset;
155 
156   MaybeCloseStream();
157   return true;
158 }
159 
MaybeCloseStream()160 void QuicStreamSequencer::MaybeCloseStream() {
161   if (blocked_ || !IsClosed()) {
162     return;
163   }
164 
165   QUIC_DVLOG(1) << "Passing up termination, as we've processed "
166                 << buffered_frames_.BytesConsumed() << " of " << close_offset_
167                 << " bytes.";
168   // This will cause the stream to consume the FIN.
169   // Technically it's an error if |num_bytes_consumed| isn't exactly
170   // equal to |close_offset|, but error handling seems silly at this point.
171   if (ignore_read_data_) {
172     // The sequencer is discarding stream data and must notify the stream on
173     // receipt of a FIN because the consumer won't.
174     stream_->OnFinRead();
175   } else {
176     stream_->OnDataAvailable();
177   }
178   buffered_frames_.Clear();
179 }
180 
GetReadableRegions(iovec * iov,size_t iov_len) const181 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const {
182   QUICHE_DCHECK(!blocked_);
183   return buffered_frames_.GetReadableRegions(iov, iov_len);
184 }
185 
GetReadableRegion(iovec * iov) const186 bool QuicStreamSequencer::GetReadableRegion(iovec* iov) const {
187   QUICHE_DCHECK(!blocked_);
188   return buffered_frames_.GetReadableRegion(iov);
189 }
190 
PeekRegion(QuicStreamOffset offset,iovec * iov) const191 bool QuicStreamSequencer::PeekRegion(QuicStreamOffset offset,
192                                      iovec* iov) const {
193   QUICHE_DCHECK(!blocked_);
194   return buffered_frames_.PeekRegion(offset, iov);
195 }
196 
Read(std::string * buffer)197 void QuicStreamSequencer::Read(std::string* buffer) {
198   QUICHE_DCHECK(!blocked_);
199   buffer->resize(buffer->size() + ReadableBytes());
200   iovec iov;
201   iov.iov_len = ReadableBytes();
202   iov.iov_base = &(*buffer)[buffer->size() - iov.iov_len];
203   Readv(&iov, 1);
204 }
205 
Readv(const struct iovec * iov,size_t iov_len)206 size_t QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
207   QUICHE_DCHECK(!blocked_);
208   std::string error_details;
209   size_t bytes_read;
210   QuicErrorCode read_error =
211       buffered_frames_.Readv(iov, iov_len, &bytes_read, &error_details);
212   if (read_error != QUIC_NO_ERROR) {
213     std::string details =
214         absl::StrCat("Stream ", stream_->id(), ": ", error_details);
215     stream_->OnUnrecoverableError(read_error, details);
216     return bytes_read;
217   }
218 
219   stream_->AddBytesConsumed(bytes_read);
220   return bytes_read;
221 }
222 
HasBytesToRead() const223 bool QuicStreamSequencer::HasBytesToRead() const {
224   return buffered_frames_.HasBytesToRead();
225 }
226 
ReadableBytes() const227 size_t QuicStreamSequencer::ReadableBytes() const {
228   return buffered_frames_.ReadableBytes();
229 }
230 
IsClosed() const231 bool QuicStreamSequencer::IsClosed() const {
232   return buffered_frames_.BytesConsumed() >= close_offset_;
233 }
234 
MarkConsumed(size_t num_bytes_consumed)235 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) {
236   QUICHE_DCHECK(!blocked_);
237   bool result = buffered_frames_.MarkConsumed(num_bytes_consumed);
238   if (!result) {
239     QUIC_BUG(quic_bug_10858_2)
240         << "Invalid argument to MarkConsumed."
241         << " expect to consume: " << num_bytes_consumed
242         << ", but not enough bytes available. " << DebugString();
243     stream_->ResetWithError(
244         QuicResetStreamError::FromInternal(QUIC_ERROR_PROCESSING_STREAM));
245     return;
246   }
247   stream_->AddBytesConsumed(num_bytes_consumed);
248 }
249 
SetBlockedUntilFlush()250 void QuicStreamSequencer::SetBlockedUntilFlush() { blocked_ = true; }
251 
SetUnblocked()252 void QuicStreamSequencer::SetUnblocked() {
253   blocked_ = false;
254   if (IsClosed() || HasBytesToRead()) {
255     stream_->OnDataAvailable();
256   }
257 }
258 
StopReading()259 void QuicStreamSequencer::StopReading() {
260   if (ignore_read_data_) {
261     return;
262   }
263   ignore_read_data_ = true;
264   FlushBufferedFrames();
265 }
266 
ReleaseBuffer()267 void QuicStreamSequencer::ReleaseBuffer() {
268   buffered_frames_.ReleaseWholeBuffer();
269 }
270 
ReleaseBufferIfEmpty()271 void QuicStreamSequencer::ReleaseBufferIfEmpty() {
272   if (buffered_frames_.Empty()) {
273     buffered_frames_.ReleaseWholeBuffer();
274   }
275 }
276 
FlushBufferedFrames()277 void QuicStreamSequencer::FlushBufferedFrames() {
278   QUICHE_DCHECK(ignore_read_data_);
279   size_t bytes_flushed = buffered_frames_.FlushBufferedFrames();
280   QUIC_DVLOG(1) << "Flushing buffered data at offset "
281                 << buffered_frames_.BytesConsumed() << " length "
282                 << bytes_flushed << " for stream " << stream_->id();
283   stream_->AddBytesConsumed(bytes_flushed);
284   MaybeCloseStream();
285 }
286 
NumBytesBuffered() const287 size_t QuicStreamSequencer::NumBytesBuffered() const {
288   return buffered_frames_.BytesBuffered();
289 }
290 
NumBytesConsumed() const291 QuicStreamOffset QuicStreamSequencer::NumBytesConsumed() const {
292   return buffered_frames_.BytesConsumed();
293 }
294 
IsAllDataAvailable() const295 bool QuicStreamSequencer::IsAllDataAvailable() const {
296   QUICHE_DCHECK_LE(NumBytesConsumed() + NumBytesBuffered(), close_offset_);
297   return NumBytesConsumed() + NumBytesBuffered() >= close_offset_;
298 }
299 
DebugString() const300 std::string QuicStreamSequencer::DebugString() const {
301   // clang-format off
302   return absl::StrCat(
303       "QuicStreamSequencer:  bytes buffered: ", NumBytesBuffered(),
304       "\n  bytes consumed: ", NumBytesConsumed(),
305       "\n  first missing byte: ", buffered_frames_.FirstMissingByte(),
306       "\n  next expected byte: ", buffered_frames_.NextExpectedByte(),
307       "\n  received frames: ", buffered_frames_.ReceivedFramesDebugString(),
308       "\n  has bytes to read: ", HasBytesToRead() ? "true" : "false",
309       "\n  frames received: ", num_frames_received(),
310       "\n  close offset bytes: ", close_offset_,
311       "\n  is closed: ", IsClosed() ? "true" : "false");
312   // clang-format on
313 }
314 
315 }  // namespace quic
316