xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/core/quic_stream_sequencer_buffer.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright (c) 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/quic_stream_sequencer_buffer.h"
6 
7 #include <algorithm>
8 #include <cstddef>
9 #include <memory>
10 #include <string>
11 
12 #include "absl/strings/str_cat.h"
13 #include "absl/strings/string_view.h"
14 #include "quiche/quic/core/quic_constants.h"
15 #include "quiche/quic/core/quic_interval.h"
16 #include "quiche/quic/platform/api/quic_bug_tracker.h"
17 #include "quiche/quic/platform/api/quic_flag_utils.h"
18 #include "quiche/quic/platform/api/quic_flags.h"
19 #include "quiche/quic/platform/api/quic_logging.h"
20 
21 namespace quic {
22 namespace {
23 
CalculateBlockCount(size_t max_capacity_bytes)24 size_t CalculateBlockCount(size_t max_capacity_bytes) {
25   return (max_capacity_bytes + QuicStreamSequencerBuffer::kBlockSizeBytes - 1) /
26          QuicStreamSequencerBuffer::kBlockSizeBytes;
27 }
28 
29 // Upper limit of how many gaps allowed in buffer, which ensures a reasonable
30 // number of iterations needed to find the right gap to fill when a frame
31 // arrives.
32 const size_t kMaxNumDataIntervalsAllowed = 2 * kMaxPacketGap;
33 
34 // Number of blocks allocated initially.
35 constexpr size_t kInitialBlockCount = 8u;
36 
37 // How fast block pointers container grow in size.
38 // Choose 4 to reduce the amount of reallocation.
39 constexpr int kBlocksGrowthFactor = 4;
40 
41 }  // namespace
42 
QuicStreamSequencerBuffer(size_t max_capacity_bytes)43 QuicStreamSequencerBuffer::QuicStreamSequencerBuffer(size_t max_capacity_bytes)
44     : max_buffer_capacity_bytes_(max_capacity_bytes),
45       max_blocks_count_(CalculateBlockCount(max_capacity_bytes)),
46       current_blocks_count_(0u),
47       total_bytes_read_(0),
48       blocks_(nullptr) {
49   QUICHE_DCHECK_GE(max_blocks_count_, kInitialBlockCount);
50   Clear();
51 }
52 
~QuicStreamSequencerBuffer()53 QuicStreamSequencerBuffer::~QuicStreamSequencerBuffer() { Clear(); }
54 
Clear()55 void QuicStreamSequencerBuffer::Clear() {
56   if (blocks_ != nullptr) {
57     for (size_t i = 0; i < current_blocks_count_; ++i) {
58       if (blocks_[i] != nullptr) {
59         RetireBlock(i);
60       }
61     }
62   }
63   num_bytes_buffered_ = 0;
64   bytes_received_.Clear();
65   bytes_received_.Add(0, total_bytes_read_);
66 }
67 
RetireBlock(size_t index)68 bool QuicStreamSequencerBuffer::RetireBlock(size_t index) {
69   if (blocks_[index] == nullptr) {
70     QUIC_BUG(quic_bug_10610_1) << "Try to retire block twice";
71     return false;
72   }
73   delete blocks_[index];
74   blocks_[index] = nullptr;
75   QUIC_DVLOG(1) << "Retired block with index: " << index;
76   return true;
77 }
78 
MaybeAddMoreBlocks(QuicStreamOffset next_expected_byte)79 void QuicStreamSequencerBuffer::MaybeAddMoreBlocks(
80     QuicStreamOffset next_expected_byte) {
81   if (current_blocks_count_ == max_blocks_count_) {
82     return;
83   }
84   QuicStreamOffset last_byte = next_expected_byte - 1;
85   size_t num_of_blocks_needed;
86   // As long as last_byte does not wrap around, its index plus one blocks are
87   // needed. Otherwise, block_count_ blocks are needed.
88   if (last_byte < max_buffer_capacity_bytes_) {
89     num_of_blocks_needed =
90         std::max(GetBlockIndex(last_byte) + 1, kInitialBlockCount);
91   } else {
92     num_of_blocks_needed = max_blocks_count_;
93   }
94   if (current_blocks_count_ >= num_of_blocks_needed) {
95     return;
96   }
97   size_t new_block_count = kBlocksGrowthFactor * current_blocks_count_;
98   new_block_count = std::min(std::max(new_block_count, num_of_blocks_needed),
99                              max_blocks_count_);
100   auto new_blocks = std::make_unique<BufferBlock*[]>(new_block_count);
101   if (blocks_ != nullptr) {
102     memcpy(new_blocks.get(), blocks_.get(),
103            current_blocks_count_ * sizeof(BufferBlock*));
104   }
105   blocks_ = std::move(new_blocks);
106   current_blocks_count_ = new_block_count;
107 }
108 
OnStreamData(QuicStreamOffset starting_offset,absl::string_view data,size_t * const bytes_buffered,std::string * error_details)109 QuicErrorCode QuicStreamSequencerBuffer::OnStreamData(
110     QuicStreamOffset starting_offset, absl::string_view data,
111     size_t* const bytes_buffered, std::string* error_details) {
112   *bytes_buffered = 0;
113   size_t size = data.size();
114   if (size == 0) {
115     *error_details = "Received empty stream frame without FIN.";
116     return QUIC_EMPTY_STREAM_FRAME_NO_FIN;
117   }
118   // Write beyond the current range this buffer is covering.
119   if (starting_offset + size > total_bytes_read_ + max_buffer_capacity_bytes_ ||
120       starting_offset + size < starting_offset) {
121     *error_details = "Received data beyond available range.";
122     return QUIC_INTERNAL_ERROR;
123   }
124 
125   if (bytes_received_.Empty() ||
126       starting_offset >= bytes_received_.rbegin()->max() ||
127       bytes_received_.IsDisjoint(QuicInterval<QuicStreamOffset>(
128           starting_offset, starting_offset + size))) {
129     // Optimization for the typical case, when all data is newly received.
130     bytes_received_.AddOptimizedForAppend(starting_offset,
131                                           starting_offset + size);
132     if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
133       // This frame is going to create more intervals than allowed. Stop
134       // processing.
135       *error_details = "Too many data intervals received for this stream.";
136       return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
137     }
138     MaybeAddMoreBlocks(starting_offset + size);
139 
140     size_t bytes_copy = 0;
141     if (!CopyStreamData(starting_offset, data, &bytes_copy, error_details)) {
142       return QUIC_STREAM_SEQUENCER_INVALID_STATE;
143     }
144     *bytes_buffered += bytes_copy;
145     num_bytes_buffered_ += *bytes_buffered;
146     return QUIC_NO_ERROR;
147   }
148   // Slow path, received data overlaps with received data.
149   QuicIntervalSet<QuicStreamOffset> newly_received(starting_offset,
150                                                    starting_offset + size);
151   newly_received.Difference(bytes_received_);
152   if (newly_received.Empty()) {
153     return QUIC_NO_ERROR;
154   }
155   bytes_received_.Add(starting_offset, starting_offset + size);
156   if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
157     // This frame is going to create more intervals than allowed. Stop
158     // processing.
159     *error_details = "Too many data intervals received for this stream.";
160     return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
161   }
162   MaybeAddMoreBlocks(starting_offset + size);
163   for (const auto& interval : newly_received) {
164     const QuicStreamOffset copy_offset = interval.min();
165     const QuicByteCount copy_length = interval.max() - interval.min();
166     size_t bytes_copy = 0;
167     if (!CopyStreamData(copy_offset,
168                         data.substr(copy_offset - starting_offset, copy_length),
169                         &bytes_copy, error_details)) {
170       return QUIC_STREAM_SEQUENCER_INVALID_STATE;
171     }
172     *bytes_buffered += bytes_copy;
173   }
174   num_bytes_buffered_ += *bytes_buffered;
175   return QUIC_NO_ERROR;
176 }
177 
CopyStreamData(QuicStreamOffset offset,absl::string_view data,size_t * bytes_copy,std::string * error_details)178 bool QuicStreamSequencerBuffer::CopyStreamData(QuicStreamOffset offset,
179                                                absl::string_view data,
180                                                size_t* bytes_copy,
181                                                std::string* error_details) {
182   *bytes_copy = 0;
183   size_t source_remaining = data.size();
184   if (source_remaining == 0) {
185     return true;
186   }
187   const char* source = data.data();
188   // Write data block by block. If corresponding block has not created yet,
189   // create it first.
190   // Stop when all data are written or reaches the logical end of the buffer.
191   while (source_remaining > 0) {
192     const size_t write_block_num = GetBlockIndex(offset);
193     const size_t write_block_offset = GetInBlockOffset(offset);
194     size_t current_blocks_count = current_blocks_count_;
195     QUICHE_DCHECK_GT(current_blocks_count, write_block_num);
196 
197     size_t block_capacity = GetBlockCapacity(write_block_num);
198     size_t bytes_avail = block_capacity - write_block_offset;
199 
200     // If this write meets the upper boundary of the buffer,
201     // reduce the available free bytes.
202     if (offset + bytes_avail > total_bytes_read_ + max_buffer_capacity_bytes_) {
203       bytes_avail = total_bytes_read_ + max_buffer_capacity_bytes_ - offset;
204     }
205 
206     if (write_block_num >= current_blocks_count) {
207       *error_details = absl::StrCat(
208           "QuicStreamSequencerBuffer error: OnStreamData() exceed array bounds."
209           "write offset = ",
210           offset, " write_block_num = ", write_block_num,
211           " current_blocks_count_ = ", current_blocks_count);
212       return false;
213     }
214     if (blocks_ == nullptr) {
215       *error_details =
216           "QuicStreamSequencerBuffer error: OnStreamData() blocks_ is null";
217       return false;
218     }
219     if (blocks_[write_block_num] == nullptr) {
220       // TODO(danzh): Investigate if using a freelist would improve performance.
221       // Same as RetireBlock().
222       blocks_[write_block_num] = new BufferBlock();
223     }
224 
225     const size_t bytes_to_copy =
226         std::min<size_t>(bytes_avail, source_remaining);
227     char* dest = blocks_[write_block_num]->buffer + write_block_offset;
228     QUIC_DVLOG(1) << "Write at offset: " << offset
229                   << " length: " << bytes_to_copy;
230 
231     if (dest == nullptr || source == nullptr) {
232       *error_details = absl::StrCat(
233           "QuicStreamSequencerBuffer error: OnStreamData()"
234           " dest == nullptr: ",
235           (dest == nullptr), " source == nullptr: ", (source == nullptr),
236           " Writing at offset ", offset,
237           " Received frames: ", ReceivedFramesDebugString(),
238           " total_bytes_read_ = ", total_bytes_read_);
239       return false;
240     }
241     memcpy(dest, source, bytes_to_copy);
242     source += bytes_to_copy;
243     source_remaining -= bytes_to_copy;
244     offset += bytes_to_copy;
245     *bytes_copy += bytes_to_copy;
246   }
247   return true;
248 }
249 
Readv(const iovec * dest_iov,size_t dest_count,size_t * bytes_read,std::string * error_details)250 QuicErrorCode QuicStreamSequencerBuffer::Readv(const iovec* dest_iov,
251                                                size_t dest_count,
252                                                size_t* bytes_read,
253                                                std::string* error_details) {
254   *bytes_read = 0;
255   for (size_t i = 0; i < dest_count && ReadableBytes() > 0; ++i) {
256     char* dest = reinterpret_cast<char*>(dest_iov[i].iov_base);
257     QUICHE_DCHECK(dest != nullptr);
258     size_t dest_remaining = dest_iov[i].iov_len;
259     while (dest_remaining > 0 && ReadableBytes() > 0) {
260       size_t block_idx = NextBlockToRead();
261       size_t start_offset_in_block = ReadOffset();
262       size_t block_capacity = GetBlockCapacity(block_idx);
263       size_t bytes_available_in_block = std::min<size_t>(
264           ReadableBytes(), block_capacity - start_offset_in_block);
265       size_t bytes_to_copy =
266           std::min<size_t>(bytes_available_in_block, dest_remaining);
267       QUICHE_DCHECK_GT(bytes_to_copy, 0u);
268       if (blocks_[block_idx] == nullptr || dest == nullptr) {
269         *error_details = absl::StrCat(
270             "QuicStreamSequencerBuffer error:"
271             " Readv() dest == nullptr: ",
272             (dest == nullptr), " blocks_[", block_idx,
273             "] == nullptr: ", (blocks_[block_idx] == nullptr),
274             " Received frames: ", ReceivedFramesDebugString(),
275             " total_bytes_read_ = ", total_bytes_read_);
276         return QUIC_STREAM_SEQUENCER_INVALID_STATE;
277       }
278       memcpy(dest, blocks_[block_idx]->buffer + start_offset_in_block,
279              bytes_to_copy);
280       dest += bytes_to_copy;
281       dest_remaining -= bytes_to_copy;
282       num_bytes_buffered_ -= bytes_to_copy;
283       total_bytes_read_ += bytes_to_copy;
284       *bytes_read += bytes_to_copy;
285 
286       // Retire the block if all the data is read out and no other data is
287       // stored in this block.
288       // In case of failing to retire a block which is ready to retire, return
289       // immediately.
290       if (bytes_to_copy == bytes_available_in_block) {
291         bool retire_successfully = RetireBlockIfEmpty(block_idx);
292         if (!retire_successfully) {
293           *error_details = absl::StrCat(
294               "QuicStreamSequencerBuffer error: fail to retire block ",
295               block_idx,
296               " as the block is already released, total_bytes_read_ = ",
297               total_bytes_read_,
298               " Received frames: ", ReceivedFramesDebugString());
299           return QUIC_STREAM_SEQUENCER_INVALID_STATE;
300         }
301       }
302     }
303   }
304 
305   return QUIC_NO_ERROR;
306 }
307 
GetReadableRegions(struct iovec * iov,int iov_len) const308 int QuicStreamSequencerBuffer::GetReadableRegions(struct iovec* iov,
309                                                   int iov_len) const {
310   QUICHE_DCHECK(iov != nullptr);
311   QUICHE_DCHECK_GT(iov_len, 0);
312 
313   if (ReadableBytes() == 0) {
314     iov[0].iov_base = nullptr;
315     iov[0].iov_len = 0;
316     return 0;
317   }
318 
319   size_t start_block_idx = NextBlockToRead();
320   QuicStreamOffset readable_offset_end = FirstMissingByte() - 1;
321   QUICHE_DCHECK_GE(readable_offset_end + 1, total_bytes_read_);
322   size_t end_block_offset = GetInBlockOffset(readable_offset_end);
323   size_t end_block_idx = GetBlockIndex(readable_offset_end);
324 
325   // If readable region is within one block, deal with it seperately.
326   if (start_block_idx == end_block_idx && ReadOffset() <= end_block_offset) {
327     iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
328     iov[0].iov_len = ReadableBytes();
329     QUIC_DVLOG(1) << "Got only a single block with index: " << start_block_idx;
330     return 1;
331   }
332 
333   // Get first block
334   iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
335   iov[0].iov_len = GetBlockCapacity(start_block_idx) - ReadOffset();
336   QUIC_DVLOG(1) << "Got first block " << start_block_idx << " with len "
337                 << iov[0].iov_len;
338   QUICHE_DCHECK_GT(readable_offset_end + 1, total_bytes_read_ + iov[0].iov_len)
339       << "there should be more available data";
340 
341   // Get readable regions of the rest blocks till either 2nd to last block
342   // before gap is met or |iov| is filled. For these blocks, one whole block is
343   // a region.
344   int iov_used = 1;
345   size_t block_idx = (start_block_idx + iov_used) % max_blocks_count_;
346   while (block_idx != end_block_idx && iov_used < iov_len) {
347     QUICHE_DCHECK(nullptr != blocks_[block_idx]);
348     iov[iov_used].iov_base = blocks_[block_idx]->buffer;
349     iov[iov_used].iov_len = GetBlockCapacity(block_idx);
350     QUIC_DVLOG(1) << "Got block with index: " << block_idx;
351     ++iov_used;
352     block_idx = (start_block_idx + iov_used) % max_blocks_count_;
353   }
354 
355   // Deal with last block if |iov| can hold more.
356   if (iov_used < iov_len) {
357     QUICHE_DCHECK(nullptr != blocks_[block_idx]);
358     iov[iov_used].iov_base = blocks_[end_block_idx]->buffer;
359     iov[iov_used].iov_len = end_block_offset + 1;
360     QUIC_DVLOG(1) << "Got last block with index: " << end_block_idx;
361     ++iov_used;
362   }
363   return iov_used;
364 }
365 
GetReadableRegion(iovec * iov) const366 bool QuicStreamSequencerBuffer::GetReadableRegion(iovec* iov) const {
367   return GetReadableRegions(iov, 1) == 1;
368 }
369 
PeekRegion(QuicStreamOffset offset,iovec * iov) const370 bool QuicStreamSequencerBuffer::PeekRegion(QuicStreamOffset offset,
371                                            iovec* iov) const {
372   QUICHE_DCHECK(iov);
373 
374   if (offset < total_bytes_read_) {
375     // Data at |offset| has already been consumed.
376     return false;
377   }
378 
379   if (offset >= FirstMissingByte()) {
380     // Data at |offset| has not been received yet.
381     return false;
382   }
383 
384   // Beginning of region.
385   size_t block_idx = GetBlockIndex(offset);
386   size_t block_offset = GetInBlockOffset(offset);
387   iov->iov_base = blocks_[block_idx]->buffer + block_offset;
388 
389   // Determine if entire block has been received.
390   size_t end_block_idx = GetBlockIndex(FirstMissingByte());
391   if (block_idx == end_block_idx) {
392     // Only read part of block before FirstMissingByte().
393     iov->iov_len = GetInBlockOffset(FirstMissingByte()) - block_offset;
394   } else {
395     // Read entire block.
396     iov->iov_len = GetBlockCapacity(block_idx) - block_offset;
397   }
398 
399   return true;
400 }
401 
MarkConsumed(size_t bytes_consumed)402 bool QuicStreamSequencerBuffer::MarkConsumed(size_t bytes_consumed) {
403   if (bytes_consumed > ReadableBytes()) {
404     return false;
405   }
406   size_t bytes_to_consume = bytes_consumed;
407   while (bytes_to_consume > 0) {
408     size_t block_idx = NextBlockToRead();
409     size_t offset_in_block = ReadOffset();
410     size_t bytes_available = std::min<size_t>(
411         ReadableBytes(), GetBlockCapacity(block_idx) - offset_in_block);
412     size_t bytes_read = std::min<size_t>(bytes_to_consume, bytes_available);
413     total_bytes_read_ += bytes_read;
414     num_bytes_buffered_ -= bytes_read;
415     bytes_to_consume -= bytes_read;
416     // If advanced to the end of current block and end of buffer hasn't wrapped
417     // to this block yet.
418     if (bytes_available == bytes_read) {
419       RetireBlockIfEmpty(block_idx);
420     }
421   }
422 
423   return true;
424 }
425 
FlushBufferedFrames()426 size_t QuicStreamSequencerBuffer::FlushBufferedFrames() {
427   size_t prev_total_bytes_read = total_bytes_read_;
428   total_bytes_read_ = NextExpectedByte();
429   Clear();
430   return total_bytes_read_ - prev_total_bytes_read;
431 }
432 
ReleaseWholeBuffer()433 void QuicStreamSequencerBuffer::ReleaseWholeBuffer() {
434   Clear();
435   current_blocks_count_ = 0;
436   blocks_.reset(nullptr);
437 }
438 
ReadableBytes() const439 size_t QuicStreamSequencerBuffer::ReadableBytes() const {
440   return FirstMissingByte() - total_bytes_read_;
441 }
442 
HasBytesToRead() const443 bool QuicStreamSequencerBuffer::HasBytesToRead() const {
444   return ReadableBytes() > 0;
445 }
446 
BytesConsumed() const447 QuicStreamOffset QuicStreamSequencerBuffer::BytesConsumed() const {
448   return total_bytes_read_;
449 }
450 
BytesBuffered() const451 size_t QuicStreamSequencerBuffer::BytesBuffered() const {
452   return num_bytes_buffered_;
453 }
454 
GetBlockIndex(QuicStreamOffset offset) const455 size_t QuicStreamSequencerBuffer::GetBlockIndex(QuicStreamOffset offset) const {
456   return (offset % max_buffer_capacity_bytes_) / kBlockSizeBytes;
457 }
458 
GetInBlockOffset(QuicStreamOffset offset) const459 size_t QuicStreamSequencerBuffer::GetInBlockOffset(
460     QuicStreamOffset offset) const {
461   return (offset % max_buffer_capacity_bytes_) % kBlockSizeBytes;
462 }
463 
ReadOffset() const464 size_t QuicStreamSequencerBuffer::ReadOffset() const {
465   return GetInBlockOffset(total_bytes_read_);
466 }
467 
NextBlockToRead() const468 size_t QuicStreamSequencerBuffer::NextBlockToRead() const {
469   return GetBlockIndex(total_bytes_read_);
470 }
471 
RetireBlockIfEmpty(size_t block_index)472 bool QuicStreamSequencerBuffer::RetireBlockIfEmpty(size_t block_index) {
473   QUICHE_DCHECK(ReadableBytes() == 0 ||
474                 GetInBlockOffset(total_bytes_read_) == 0)
475       << "RetireBlockIfEmpty() should only be called when advancing to next "
476       << "block or a gap has been reached.";
477   // If the whole buffer becomes empty, the last piece of data has been read.
478   if (Empty()) {
479     return RetireBlock(block_index);
480   }
481 
482   // Check where the logical end of this buffer is.
483   // Not empty if the end of circular buffer has been wrapped to this block.
484   if (GetBlockIndex(NextExpectedByte() - 1) == block_index) {
485     return true;
486   }
487 
488   // Read index remains in this block, which means a gap has been reached.
489   if (NextBlockToRead() == block_index) {
490     if (bytes_received_.Size() > 1) {
491       auto it = bytes_received_.begin();
492       ++it;
493       if (GetBlockIndex(it->min()) == block_index) {
494         // Do not retire the block if next data interval is in this block.
495         return true;
496       }
497     } else {
498       QUIC_BUG(quic_bug_10610_2) << "Read stopped at where it shouldn't.";
499       return false;
500     }
501   }
502   return RetireBlock(block_index);
503 }
504 
Empty() const505 bool QuicStreamSequencerBuffer::Empty() const {
506   return bytes_received_.Empty() ||
507          (bytes_received_.Size() == 1 && total_bytes_read_ > 0 &&
508           bytes_received_.begin()->max() == total_bytes_read_);
509 }
510 
GetBlockCapacity(size_t block_index) const511 size_t QuicStreamSequencerBuffer::GetBlockCapacity(size_t block_index) const {
512   if ((block_index + 1) == max_blocks_count_) {
513     size_t result = max_buffer_capacity_bytes_ % kBlockSizeBytes;
514     if (result == 0) {  // whole block
515       result = kBlockSizeBytes;
516     }
517     return result;
518   } else {
519     return kBlockSizeBytes;
520   }
521 }
522 
ReceivedFramesDebugString() const523 std::string QuicStreamSequencerBuffer::ReceivedFramesDebugString() const {
524   return bytes_received_.ToString();
525 }
526 
FirstMissingByte() const527 QuicStreamOffset QuicStreamSequencerBuffer::FirstMissingByte() const {
528   if (bytes_received_.Empty() || bytes_received_.begin()->min() > 0) {
529     // Offset 0 is not received yet.
530     return 0;
531   }
532   return bytes_received_.begin()->max();
533 }
534 
NextExpectedByte() const535 QuicStreamOffset QuicStreamSequencerBuffer::NextExpectedByte() const {
536   if (bytes_received_.Empty()) {
537     return 0;
538   }
539   return bytes_received_.rbegin()->max();
540 }
541 
542 }  //  namespace quic
543