1 // Copyright (c) 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/quic_stream_sequencer_buffer.h"
6
7 #include <algorithm>
8 #include <cstddef>
9 #include <memory>
10 #include <string>
11
12 #include "absl/strings/str_cat.h"
13 #include "absl/strings/string_view.h"
14 #include "quiche/quic/core/quic_constants.h"
15 #include "quiche/quic/core/quic_interval.h"
16 #include "quiche/quic/platform/api/quic_bug_tracker.h"
17 #include "quiche/quic/platform/api/quic_flag_utils.h"
18 #include "quiche/quic/platform/api/quic_flags.h"
19 #include "quiche/quic/platform/api/quic_logging.h"
20
21 namespace quic {
22 namespace {
23
CalculateBlockCount(size_t max_capacity_bytes)24 size_t CalculateBlockCount(size_t max_capacity_bytes) {
25 return (max_capacity_bytes + QuicStreamSequencerBuffer::kBlockSizeBytes - 1) /
26 QuicStreamSequencerBuffer::kBlockSizeBytes;
27 }
28
29 // Upper limit of how many gaps allowed in buffer, which ensures a reasonable
30 // number of iterations needed to find the right gap to fill when a frame
31 // arrives.
32 const size_t kMaxNumDataIntervalsAllowed = 2 * kMaxPacketGap;
33
34 // Number of blocks allocated initially.
35 constexpr size_t kInitialBlockCount = 8u;
36
37 // How fast block pointers container grow in size.
38 // Choose 4 to reduce the amount of reallocation.
39 constexpr int kBlocksGrowthFactor = 4;
40
41 } // namespace
42
QuicStreamSequencerBuffer(size_t max_capacity_bytes)43 QuicStreamSequencerBuffer::QuicStreamSequencerBuffer(size_t max_capacity_bytes)
44 : max_buffer_capacity_bytes_(max_capacity_bytes),
45 max_blocks_count_(CalculateBlockCount(max_capacity_bytes)),
46 current_blocks_count_(0u),
47 total_bytes_read_(0),
48 blocks_(nullptr) {
49 QUICHE_DCHECK_GE(max_blocks_count_, kInitialBlockCount);
50 Clear();
51 }
52
~QuicStreamSequencerBuffer()53 QuicStreamSequencerBuffer::~QuicStreamSequencerBuffer() { Clear(); }
54
Clear()55 void QuicStreamSequencerBuffer::Clear() {
56 if (blocks_ != nullptr) {
57 for (size_t i = 0; i < current_blocks_count_; ++i) {
58 if (blocks_[i] != nullptr) {
59 RetireBlock(i);
60 }
61 }
62 }
63 num_bytes_buffered_ = 0;
64 bytes_received_.Clear();
65 bytes_received_.Add(0, total_bytes_read_);
66 }
67
RetireBlock(size_t index)68 bool QuicStreamSequencerBuffer::RetireBlock(size_t index) {
69 if (blocks_[index] == nullptr) {
70 QUIC_BUG(quic_bug_10610_1) << "Try to retire block twice";
71 return false;
72 }
73 delete blocks_[index];
74 blocks_[index] = nullptr;
75 QUIC_DVLOG(1) << "Retired block with index: " << index;
76 return true;
77 }
78
MaybeAddMoreBlocks(QuicStreamOffset next_expected_byte)79 void QuicStreamSequencerBuffer::MaybeAddMoreBlocks(
80 QuicStreamOffset next_expected_byte) {
81 if (current_blocks_count_ == max_blocks_count_) {
82 return;
83 }
84 QuicStreamOffset last_byte = next_expected_byte - 1;
85 size_t num_of_blocks_needed;
86 // As long as last_byte does not wrap around, its index plus one blocks are
87 // needed. Otherwise, block_count_ blocks are needed.
88 if (last_byte < max_buffer_capacity_bytes_) {
89 num_of_blocks_needed =
90 std::max(GetBlockIndex(last_byte) + 1, kInitialBlockCount);
91 } else {
92 num_of_blocks_needed = max_blocks_count_;
93 }
94 if (current_blocks_count_ >= num_of_blocks_needed) {
95 return;
96 }
97 size_t new_block_count = kBlocksGrowthFactor * current_blocks_count_;
98 new_block_count = std::min(std::max(new_block_count, num_of_blocks_needed),
99 max_blocks_count_);
100 auto new_blocks = std::make_unique<BufferBlock*[]>(new_block_count);
101 if (blocks_ != nullptr) {
102 memcpy(new_blocks.get(), blocks_.get(),
103 current_blocks_count_ * sizeof(BufferBlock*));
104 }
105 blocks_ = std::move(new_blocks);
106 current_blocks_count_ = new_block_count;
107 }
108
OnStreamData(QuicStreamOffset starting_offset,absl::string_view data,size_t * const bytes_buffered,std::string * error_details)109 QuicErrorCode QuicStreamSequencerBuffer::OnStreamData(
110 QuicStreamOffset starting_offset, absl::string_view data,
111 size_t* const bytes_buffered, std::string* error_details) {
112 *bytes_buffered = 0;
113 size_t size = data.size();
114 if (size == 0) {
115 *error_details = "Received empty stream frame without FIN.";
116 return QUIC_EMPTY_STREAM_FRAME_NO_FIN;
117 }
118 // Write beyond the current range this buffer is covering.
119 if (starting_offset + size > total_bytes_read_ + max_buffer_capacity_bytes_ ||
120 starting_offset + size < starting_offset) {
121 *error_details = "Received data beyond available range.";
122 return QUIC_INTERNAL_ERROR;
123 }
124
125 if (bytes_received_.Empty() ||
126 starting_offset >= bytes_received_.rbegin()->max() ||
127 bytes_received_.IsDisjoint(QuicInterval<QuicStreamOffset>(
128 starting_offset, starting_offset + size))) {
129 // Optimization for the typical case, when all data is newly received.
130 bytes_received_.AddOptimizedForAppend(starting_offset,
131 starting_offset + size);
132 if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
133 // This frame is going to create more intervals than allowed. Stop
134 // processing.
135 *error_details = "Too many data intervals received for this stream.";
136 return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
137 }
138 MaybeAddMoreBlocks(starting_offset + size);
139
140 size_t bytes_copy = 0;
141 if (!CopyStreamData(starting_offset, data, &bytes_copy, error_details)) {
142 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
143 }
144 *bytes_buffered += bytes_copy;
145 num_bytes_buffered_ += *bytes_buffered;
146 return QUIC_NO_ERROR;
147 }
148 // Slow path, received data overlaps with received data.
149 QuicIntervalSet<QuicStreamOffset> newly_received(starting_offset,
150 starting_offset + size);
151 newly_received.Difference(bytes_received_);
152 if (newly_received.Empty()) {
153 return QUIC_NO_ERROR;
154 }
155 bytes_received_.Add(starting_offset, starting_offset + size);
156 if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
157 // This frame is going to create more intervals than allowed. Stop
158 // processing.
159 *error_details = "Too many data intervals received for this stream.";
160 return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
161 }
162 MaybeAddMoreBlocks(starting_offset + size);
163 for (const auto& interval : newly_received) {
164 const QuicStreamOffset copy_offset = interval.min();
165 const QuicByteCount copy_length = interval.max() - interval.min();
166 size_t bytes_copy = 0;
167 if (!CopyStreamData(copy_offset,
168 data.substr(copy_offset - starting_offset, copy_length),
169 &bytes_copy, error_details)) {
170 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
171 }
172 *bytes_buffered += bytes_copy;
173 }
174 num_bytes_buffered_ += *bytes_buffered;
175 return QUIC_NO_ERROR;
176 }
177
CopyStreamData(QuicStreamOffset offset,absl::string_view data,size_t * bytes_copy,std::string * error_details)178 bool QuicStreamSequencerBuffer::CopyStreamData(QuicStreamOffset offset,
179 absl::string_view data,
180 size_t* bytes_copy,
181 std::string* error_details) {
182 *bytes_copy = 0;
183 size_t source_remaining = data.size();
184 if (source_remaining == 0) {
185 return true;
186 }
187 const char* source = data.data();
188 // Write data block by block. If corresponding block has not created yet,
189 // create it first.
190 // Stop when all data are written or reaches the logical end of the buffer.
191 while (source_remaining > 0) {
192 const size_t write_block_num = GetBlockIndex(offset);
193 const size_t write_block_offset = GetInBlockOffset(offset);
194 size_t current_blocks_count = current_blocks_count_;
195 QUICHE_DCHECK_GT(current_blocks_count, write_block_num);
196
197 size_t block_capacity = GetBlockCapacity(write_block_num);
198 size_t bytes_avail = block_capacity - write_block_offset;
199
200 // If this write meets the upper boundary of the buffer,
201 // reduce the available free bytes.
202 if (offset + bytes_avail > total_bytes_read_ + max_buffer_capacity_bytes_) {
203 bytes_avail = total_bytes_read_ + max_buffer_capacity_bytes_ - offset;
204 }
205
206 if (write_block_num >= current_blocks_count) {
207 *error_details = absl::StrCat(
208 "QuicStreamSequencerBuffer error: OnStreamData() exceed array bounds."
209 "write offset = ",
210 offset, " write_block_num = ", write_block_num,
211 " current_blocks_count_ = ", current_blocks_count);
212 return false;
213 }
214 if (blocks_ == nullptr) {
215 *error_details =
216 "QuicStreamSequencerBuffer error: OnStreamData() blocks_ is null";
217 return false;
218 }
219 if (blocks_[write_block_num] == nullptr) {
220 // TODO(danzh): Investigate if using a freelist would improve performance.
221 // Same as RetireBlock().
222 blocks_[write_block_num] = new BufferBlock();
223 }
224
225 const size_t bytes_to_copy =
226 std::min<size_t>(bytes_avail, source_remaining);
227 char* dest = blocks_[write_block_num]->buffer + write_block_offset;
228 QUIC_DVLOG(1) << "Write at offset: " << offset
229 << " length: " << bytes_to_copy;
230
231 if (dest == nullptr || source == nullptr) {
232 *error_details = absl::StrCat(
233 "QuicStreamSequencerBuffer error: OnStreamData()"
234 " dest == nullptr: ",
235 (dest == nullptr), " source == nullptr: ", (source == nullptr),
236 " Writing at offset ", offset,
237 " Received frames: ", ReceivedFramesDebugString(),
238 " total_bytes_read_ = ", total_bytes_read_);
239 return false;
240 }
241 memcpy(dest, source, bytes_to_copy);
242 source += bytes_to_copy;
243 source_remaining -= bytes_to_copy;
244 offset += bytes_to_copy;
245 *bytes_copy += bytes_to_copy;
246 }
247 return true;
248 }
249
Readv(const iovec * dest_iov,size_t dest_count,size_t * bytes_read,std::string * error_details)250 QuicErrorCode QuicStreamSequencerBuffer::Readv(const iovec* dest_iov,
251 size_t dest_count,
252 size_t* bytes_read,
253 std::string* error_details) {
254 *bytes_read = 0;
255 for (size_t i = 0; i < dest_count && ReadableBytes() > 0; ++i) {
256 char* dest = reinterpret_cast<char*>(dest_iov[i].iov_base);
257 QUICHE_DCHECK(dest != nullptr);
258 size_t dest_remaining = dest_iov[i].iov_len;
259 while (dest_remaining > 0 && ReadableBytes() > 0) {
260 size_t block_idx = NextBlockToRead();
261 size_t start_offset_in_block = ReadOffset();
262 size_t block_capacity = GetBlockCapacity(block_idx);
263 size_t bytes_available_in_block = std::min<size_t>(
264 ReadableBytes(), block_capacity - start_offset_in_block);
265 size_t bytes_to_copy =
266 std::min<size_t>(bytes_available_in_block, dest_remaining);
267 QUICHE_DCHECK_GT(bytes_to_copy, 0u);
268 if (blocks_[block_idx] == nullptr || dest == nullptr) {
269 *error_details = absl::StrCat(
270 "QuicStreamSequencerBuffer error:"
271 " Readv() dest == nullptr: ",
272 (dest == nullptr), " blocks_[", block_idx,
273 "] == nullptr: ", (blocks_[block_idx] == nullptr),
274 " Received frames: ", ReceivedFramesDebugString(),
275 " total_bytes_read_ = ", total_bytes_read_);
276 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
277 }
278 memcpy(dest, blocks_[block_idx]->buffer + start_offset_in_block,
279 bytes_to_copy);
280 dest += bytes_to_copy;
281 dest_remaining -= bytes_to_copy;
282 num_bytes_buffered_ -= bytes_to_copy;
283 total_bytes_read_ += bytes_to_copy;
284 *bytes_read += bytes_to_copy;
285
286 // Retire the block if all the data is read out and no other data is
287 // stored in this block.
288 // In case of failing to retire a block which is ready to retire, return
289 // immediately.
290 if (bytes_to_copy == bytes_available_in_block) {
291 bool retire_successfully = RetireBlockIfEmpty(block_idx);
292 if (!retire_successfully) {
293 *error_details = absl::StrCat(
294 "QuicStreamSequencerBuffer error: fail to retire block ",
295 block_idx,
296 " as the block is already released, total_bytes_read_ = ",
297 total_bytes_read_,
298 " Received frames: ", ReceivedFramesDebugString());
299 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
300 }
301 }
302 }
303 }
304
305 return QUIC_NO_ERROR;
306 }
307
GetReadableRegions(struct iovec * iov,int iov_len) const308 int QuicStreamSequencerBuffer::GetReadableRegions(struct iovec* iov,
309 int iov_len) const {
310 QUICHE_DCHECK(iov != nullptr);
311 QUICHE_DCHECK_GT(iov_len, 0);
312
313 if (ReadableBytes() == 0) {
314 iov[0].iov_base = nullptr;
315 iov[0].iov_len = 0;
316 return 0;
317 }
318
319 size_t start_block_idx = NextBlockToRead();
320 QuicStreamOffset readable_offset_end = FirstMissingByte() - 1;
321 QUICHE_DCHECK_GE(readable_offset_end + 1, total_bytes_read_);
322 size_t end_block_offset = GetInBlockOffset(readable_offset_end);
323 size_t end_block_idx = GetBlockIndex(readable_offset_end);
324
325 // If readable region is within one block, deal with it seperately.
326 if (start_block_idx == end_block_idx && ReadOffset() <= end_block_offset) {
327 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
328 iov[0].iov_len = ReadableBytes();
329 QUIC_DVLOG(1) << "Got only a single block with index: " << start_block_idx;
330 return 1;
331 }
332
333 // Get first block
334 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
335 iov[0].iov_len = GetBlockCapacity(start_block_idx) - ReadOffset();
336 QUIC_DVLOG(1) << "Got first block " << start_block_idx << " with len "
337 << iov[0].iov_len;
338 QUICHE_DCHECK_GT(readable_offset_end + 1, total_bytes_read_ + iov[0].iov_len)
339 << "there should be more available data";
340
341 // Get readable regions of the rest blocks till either 2nd to last block
342 // before gap is met or |iov| is filled. For these blocks, one whole block is
343 // a region.
344 int iov_used = 1;
345 size_t block_idx = (start_block_idx + iov_used) % max_blocks_count_;
346 while (block_idx != end_block_idx && iov_used < iov_len) {
347 QUICHE_DCHECK(nullptr != blocks_[block_idx]);
348 iov[iov_used].iov_base = blocks_[block_idx]->buffer;
349 iov[iov_used].iov_len = GetBlockCapacity(block_idx);
350 QUIC_DVLOG(1) << "Got block with index: " << block_idx;
351 ++iov_used;
352 block_idx = (start_block_idx + iov_used) % max_blocks_count_;
353 }
354
355 // Deal with last block if |iov| can hold more.
356 if (iov_used < iov_len) {
357 QUICHE_DCHECK(nullptr != blocks_[block_idx]);
358 iov[iov_used].iov_base = blocks_[end_block_idx]->buffer;
359 iov[iov_used].iov_len = end_block_offset + 1;
360 QUIC_DVLOG(1) << "Got last block with index: " << end_block_idx;
361 ++iov_used;
362 }
363 return iov_used;
364 }
365
GetReadableRegion(iovec * iov) const366 bool QuicStreamSequencerBuffer::GetReadableRegion(iovec* iov) const {
367 return GetReadableRegions(iov, 1) == 1;
368 }
369
PeekRegion(QuicStreamOffset offset,iovec * iov) const370 bool QuicStreamSequencerBuffer::PeekRegion(QuicStreamOffset offset,
371 iovec* iov) const {
372 QUICHE_DCHECK(iov);
373
374 if (offset < total_bytes_read_) {
375 // Data at |offset| has already been consumed.
376 return false;
377 }
378
379 if (offset >= FirstMissingByte()) {
380 // Data at |offset| has not been received yet.
381 return false;
382 }
383
384 // Beginning of region.
385 size_t block_idx = GetBlockIndex(offset);
386 size_t block_offset = GetInBlockOffset(offset);
387 iov->iov_base = blocks_[block_idx]->buffer + block_offset;
388
389 // Determine if entire block has been received.
390 size_t end_block_idx = GetBlockIndex(FirstMissingByte());
391 if (block_idx == end_block_idx) {
392 // Only read part of block before FirstMissingByte().
393 iov->iov_len = GetInBlockOffset(FirstMissingByte()) - block_offset;
394 } else {
395 // Read entire block.
396 iov->iov_len = GetBlockCapacity(block_idx) - block_offset;
397 }
398
399 return true;
400 }
401
MarkConsumed(size_t bytes_consumed)402 bool QuicStreamSequencerBuffer::MarkConsumed(size_t bytes_consumed) {
403 if (bytes_consumed > ReadableBytes()) {
404 return false;
405 }
406 size_t bytes_to_consume = bytes_consumed;
407 while (bytes_to_consume > 0) {
408 size_t block_idx = NextBlockToRead();
409 size_t offset_in_block = ReadOffset();
410 size_t bytes_available = std::min<size_t>(
411 ReadableBytes(), GetBlockCapacity(block_idx) - offset_in_block);
412 size_t bytes_read = std::min<size_t>(bytes_to_consume, bytes_available);
413 total_bytes_read_ += bytes_read;
414 num_bytes_buffered_ -= bytes_read;
415 bytes_to_consume -= bytes_read;
416 // If advanced to the end of current block and end of buffer hasn't wrapped
417 // to this block yet.
418 if (bytes_available == bytes_read) {
419 RetireBlockIfEmpty(block_idx);
420 }
421 }
422
423 return true;
424 }
425
FlushBufferedFrames()426 size_t QuicStreamSequencerBuffer::FlushBufferedFrames() {
427 size_t prev_total_bytes_read = total_bytes_read_;
428 total_bytes_read_ = NextExpectedByte();
429 Clear();
430 return total_bytes_read_ - prev_total_bytes_read;
431 }
432
ReleaseWholeBuffer()433 void QuicStreamSequencerBuffer::ReleaseWholeBuffer() {
434 Clear();
435 current_blocks_count_ = 0;
436 blocks_.reset(nullptr);
437 }
438
ReadableBytes() const439 size_t QuicStreamSequencerBuffer::ReadableBytes() const {
440 return FirstMissingByte() - total_bytes_read_;
441 }
442
HasBytesToRead() const443 bool QuicStreamSequencerBuffer::HasBytesToRead() const {
444 return ReadableBytes() > 0;
445 }
446
BytesConsumed() const447 QuicStreamOffset QuicStreamSequencerBuffer::BytesConsumed() const {
448 return total_bytes_read_;
449 }
450
BytesBuffered() const451 size_t QuicStreamSequencerBuffer::BytesBuffered() const {
452 return num_bytes_buffered_;
453 }
454
GetBlockIndex(QuicStreamOffset offset) const455 size_t QuicStreamSequencerBuffer::GetBlockIndex(QuicStreamOffset offset) const {
456 return (offset % max_buffer_capacity_bytes_) / kBlockSizeBytes;
457 }
458
GetInBlockOffset(QuicStreamOffset offset) const459 size_t QuicStreamSequencerBuffer::GetInBlockOffset(
460 QuicStreamOffset offset) const {
461 return (offset % max_buffer_capacity_bytes_) % kBlockSizeBytes;
462 }
463
ReadOffset() const464 size_t QuicStreamSequencerBuffer::ReadOffset() const {
465 return GetInBlockOffset(total_bytes_read_);
466 }
467
NextBlockToRead() const468 size_t QuicStreamSequencerBuffer::NextBlockToRead() const {
469 return GetBlockIndex(total_bytes_read_);
470 }
471
RetireBlockIfEmpty(size_t block_index)472 bool QuicStreamSequencerBuffer::RetireBlockIfEmpty(size_t block_index) {
473 QUICHE_DCHECK(ReadableBytes() == 0 ||
474 GetInBlockOffset(total_bytes_read_) == 0)
475 << "RetireBlockIfEmpty() should only be called when advancing to next "
476 << "block or a gap has been reached.";
477 // If the whole buffer becomes empty, the last piece of data has been read.
478 if (Empty()) {
479 return RetireBlock(block_index);
480 }
481
482 // Check where the logical end of this buffer is.
483 // Not empty if the end of circular buffer has been wrapped to this block.
484 if (GetBlockIndex(NextExpectedByte() - 1) == block_index) {
485 return true;
486 }
487
488 // Read index remains in this block, which means a gap has been reached.
489 if (NextBlockToRead() == block_index) {
490 if (bytes_received_.Size() > 1) {
491 auto it = bytes_received_.begin();
492 ++it;
493 if (GetBlockIndex(it->min()) == block_index) {
494 // Do not retire the block if next data interval is in this block.
495 return true;
496 }
497 } else {
498 QUIC_BUG(quic_bug_10610_2) << "Read stopped at where it shouldn't.";
499 return false;
500 }
501 }
502 return RetireBlock(block_index);
503 }
504
Empty() const505 bool QuicStreamSequencerBuffer::Empty() const {
506 return bytes_received_.Empty() ||
507 (bytes_received_.Size() == 1 && total_bytes_read_ > 0 &&
508 bytes_received_.begin()->max() == total_bytes_read_);
509 }
510
GetBlockCapacity(size_t block_index) const511 size_t QuicStreamSequencerBuffer::GetBlockCapacity(size_t block_index) const {
512 if ((block_index + 1) == max_blocks_count_) {
513 size_t result = max_buffer_capacity_bytes_ % kBlockSizeBytes;
514 if (result == 0) { // whole block
515 result = kBlockSizeBytes;
516 }
517 return result;
518 } else {
519 return kBlockSizeBytes;
520 }
521 }
522
ReceivedFramesDebugString() const523 std::string QuicStreamSequencerBuffer::ReceivedFramesDebugString() const {
524 return bytes_received_.ToString();
525 }
526
FirstMissingByte() const527 QuicStreamOffset QuicStreamSequencerBuffer::FirstMissingByte() const {
528 if (bytes_received_.Empty() || bytes_received_.begin()->min() > 0) {
529 // Offset 0 is not received yet.
530 return 0;
531 }
532 return bytes_received_.begin()->max();
533 }
534
NextExpectedByte() const535 QuicStreamOffset QuicStreamSequencerBuffer::NextExpectedByte() const {
536 if (bytes_received_.Empty()) {
537 return 0;
538 }
539 return bytes_received_.rbegin()->max();
540 }
541
542 } // namespace quic
543