1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/quic_stream.h"
6
7 #include <limits>
8 #include <optional>
9 #include <string>
10
11 #include "absl/strings/str_cat.h"
12 #include "absl/strings/string_view.h"
13 #include "quiche/quic/core/quic_error_codes.h"
14 #include "quiche/quic/core/quic_flow_controller.h"
15 #include "quiche/quic/core/quic_session.h"
16 #include "quiche/quic/core/quic_types.h"
17 #include "quiche/quic/core/quic_utils.h"
18 #include "quiche/quic/core/quic_versions.h"
19 #include "quiche/quic/platform/api/quic_bug_tracker.h"
20 #include "quiche/quic/platform/api/quic_flag_utils.h"
21 #include "quiche/quic/platform/api/quic_flags.h"
22 #include "quiche/quic/platform/api/quic_logging.h"
23 #include "quiche/common/platform/api/quiche_logging.h"
24 #include "quiche/common/platform/api/quiche_mem_slice.h"
25
26 using spdy::SpdyPriority;
27
28 namespace quic {
29
30 #define ENDPOINT \
31 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
32
33 namespace {
34
DefaultFlowControlWindow(ParsedQuicVersion version)35 QuicByteCount DefaultFlowControlWindow(ParsedQuicVersion version) {
36 if (!version.AllowsLowFlowControlLimits()) {
37 return kDefaultFlowControlSendWindow;
38 }
39 return 0;
40 }
41
GetInitialStreamFlowControlWindowToSend(QuicSession * session,QuicStreamId stream_id)42 QuicByteCount GetInitialStreamFlowControlWindowToSend(QuicSession* session,
43 QuicStreamId stream_id) {
44 ParsedQuicVersion version = session->connection()->version();
45 if (version.handshake_protocol != PROTOCOL_TLS1_3) {
46 return session->config()->GetInitialStreamFlowControlWindowToSend();
47 }
48
49 // Unidirectional streams (v99 only).
50 if (VersionHasIetfQuicFrames(version.transport_version) &&
51 !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
52 return session->config()
53 ->GetInitialMaxStreamDataBytesUnidirectionalToSend();
54 }
55
56 if (QuicUtils::IsOutgoingStreamId(version, stream_id,
57 session->perspective())) {
58 return session->config()
59 ->GetInitialMaxStreamDataBytesOutgoingBidirectionalToSend();
60 }
61
62 return session->config()
63 ->GetInitialMaxStreamDataBytesIncomingBidirectionalToSend();
64 }
65
GetReceivedFlowControlWindow(QuicSession * session,QuicStreamId stream_id)66 QuicByteCount GetReceivedFlowControlWindow(QuicSession* session,
67 QuicStreamId stream_id) {
68 ParsedQuicVersion version = session->connection()->version();
69 if (version.handshake_protocol != PROTOCOL_TLS1_3) {
70 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
71 return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
72 }
73
74 return DefaultFlowControlWindow(version);
75 }
76
77 // Unidirectional streams (v99 only).
78 if (VersionHasIetfQuicFrames(version.transport_version) &&
79 !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
80 if (session->config()
81 ->HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
82 return session->config()
83 ->ReceivedInitialMaxStreamDataBytesUnidirectional();
84 }
85
86 return DefaultFlowControlWindow(version);
87 }
88
89 if (QuicUtils::IsOutgoingStreamId(version, stream_id,
90 session->perspective())) {
91 if (session->config()
92 ->HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
93 return session->config()
94 ->ReceivedInitialMaxStreamDataBytesOutgoingBidirectional();
95 }
96
97 return DefaultFlowControlWindow(version);
98 }
99
100 if (session->config()
101 ->HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
102 return session->config()
103 ->ReceivedInitialMaxStreamDataBytesIncomingBidirectional();
104 }
105
106 return DefaultFlowControlWindow(version);
107 }
108
109 } // namespace
110
PendingStream(QuicStreamId id,QuicSession * session)111 PendingStream::PendingStream(QuicStreamId id, QuicSession* session)
112 : id_(id),
113 version_(session->version()),
114 stream_delegate_(session),
115 stream_bytes_read_(0),
116 fin_received_(false),
117 is_bidirectional_(QuicUtils::GetStreamType(id, session->perspective(),
118 /*peer_initiated = */ true,
119 session->version()) ==
120 BIDIRECTIONAL),
121 connection_flow_controller_(session->flow_controller()),
122 flow_controller_(session, id,
123 /*is_connection_flow_controller*/ false,
124 GetReceivedFlowControlWindow(session, id),
125 GetInitialStreamFlowControlWindowToSend(session, id),
126 kStreamReceiveWindowLimit,
127 session->flow_controller()->auto_tune_receive_window(),
128 session->flow_controller()),
129 sequencer_(this),
130 creation_time_(session->GetClock()->ApproximateNow()) {
131 if (is_bidirectional_) {
132 QUIC_CODE_COUNT_N(quic_pending_stream, 3, 3);
133 }
134 }
135
OnDataAvailable()136 void PendingStream::OnDataAvailable() {
137 // Data should be kept in the sequencer so that
138 // QuicSession::ProcessPendingStream() can read it.
139 }
140
OnFinRead()141 void PendingStream::OnFinRead() { QUICHE_DCHECK(sequencer_.IsClosed()); }
142
AddBytesConsumed(QuicByteCount bytes)143 void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
144 // It will be called when the metadata of the stream is consumed.
145 flow_controller_.AddBytesConsumed(bytes);
146 connection_flow_controller_->AddBytesConsumed(bytes);
147 }
148
ResetWithError(QuicResetStreamError)149 void PendingStream::ResetWithError(QuicResetStreamError /*error*/) {
150 // Currently PendingStream is only read-unidirectional. It shouldn't send
151 // Reset.
152 QUICHE_NOTREACHED();
153 }
154
OnUnrecoverableError(QuicErrorCode error,const std::string & details)155 void PendingStream::OnUnrecoverableError(QuicErrorCode error,
156 const std::string& details) {
157 stream_delegate_->OnStreamError(error, details);
158 }
159
OnUnrecoverableError(QuicErrorCode error,QuicIetfTransportErrorCodes ietf_error,const std::string & details)160 void PendingStream::OnUnrecoverableError(QuicErrorCode error,
161 QuicIetfTransportErrorCodes ietf_error,
162 const std::string& details) {
163 stream_delegate_->OnStreamError(error, ietf_error, details);
164 }
165
id() const166 QuicStreamId PendingStream::id() const { return id_; }
167
version() const168 ParsedQuicVersion PendingStream::version() const { return version_; }
169
OnStreamFrame(const QuicStreamFrame & frame)170 void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) {
171 QUICHE_DCHECK_EQ(frame.stream_id, id_);
172
173 bool is_stream_too_long =
174 (frame.offset > kMaxStreamLength) ||
175 (kMaxStreamLength - frame.offset < frame.data_length);
176 if (is_stream_too_long) {
177 // Close connection if stream becomes too long.
178 QUIC_PEER_BUG(quic_peer_bug_12570_1)
179 << "Receive stream frame reaches max stream length. frame offset "
180 << frame.offset << " length " << frame.data_length;
181 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
182 "Peer sends more data than allowed on this stream.");
183 return;
184 }
185
186 if (frame.offset + frame.data_length > sequencer_.close_offset()) {
187 OnUnrecoverableError(
188 QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
189 absl::StrCat(
190 "Stream ", id_,
191 " received data with offset: ", frame.offset + frame.data_length,
192 ", which is beyond close offset: ", sequencer()->close_offset()));
193 return;
194 }
195
196 if (frame.fin) {
197 fin_received_ = true;
198 }
199
200 // This count includes duplicate data received.
201 QuicByteCount frame_payload_size = frame.data_length;
202 stream_bytes_read_ += frame_payload_size;
203
204 // Flow control is interested in tracking highest received offset.
205 // Only interested in received frames that carry data.
206 if (frame_payload_size > 0 &&
207 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
208 // As the highest received offset has changed, check to see if this is a
209 // violation of flow control.
210 if (flow_controller_.FlowControlViolation() ||
211 connection_flow_controller_->FlowControlViolation()) {
212 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
213 "Flow control violation after increasing offset");
214 return;
215 }
216 }
217
218 sequencer_.OnStreamFrame(frame);
219 }
220
OnRstStreamFrame(const QuicRstStreamFrame & frame)221 void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
222 QUICHE_DCHECK_EQ(frame.stream_id, id_);
223
224 if (frame.byte_offset > kMaxStreamLength) {
225 // Peer are not suppose to write bytes more than maxium allowed.
226 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
227 "Reset frame stream offset overflow.");
228 return;
229 }
230
231 const QuicStreamOffset kMaxOffset =
232 std::numeric_limits<QuicStreamOffset>::max();
233 if (sequencer()->close_offset() != kMaxOffset &&
234 frame.byte_offset != sequencer()->close_offset()) {
235 OnUnrecoverableError(
236 QUIC_STREAM_MULTIPLE_OFFSET,
237 absl::StrCat("Stream ", id_,
238 " received new final offset: ", frame.byte_offset,
239 ", which is different from close offset: ",
240 sequencer()->close_offset()));
241 return;
242 }
243
244 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
245 if (flow_controller_.FlowControlViolation() ||
246 connection_flow_controller_->FlowControlViolation()) {
247 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
248 "Flow control violation after increasing offset");
249 return;
250 }
251 }
252
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)253 void PendingStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
254 QUICHE_DCHECK(is_bidirectional_);
255 flow_controller_.UpdateSendWindowOffset(frame.max_data);
256 }
257
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)258 bool PendingStream::MaybeIncreaseHighestReceivedOffset(
259 QuicStreamOffset new_offset) {
260 uint64_t increment =
261 new_offset - flow_controller_.highest_received_byte_offset();
262 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
263 return false;
264 }
265
266 // If |new_offset| increased the stream flow controller's highest received
267 // offset, increase the connection flow controller's value by the incremental
268 // difference.
269 connection_flow_controller_->UpdateHighestReceivedOffset(
270 connection_flow_controller_->highest_received_byte_offset() + increment);
271 return true;
272 }
273
OnStopSending(QuicResetStreamError stop_sending_error_code)274 void PendingStream::OnStopSending(
275 QuicResetStreamError stop_sending_error_code) {
276 if (!stop_sending_error_code_) {
277 stop_sending_error_code_ = stop_sending_error_code;
278 }
279 }
280
MarkConsumed(QuicByteCount num_bytes)281 void PendingStream::MarkConsumed(QuicByteCount num_bytes) {
282 sequencer_.MarkConsumed(num_bytes);
283 }
284
StopReading()285 void PendingStream::StopReading() {
286 QUIC_DVLOG(1) << "Stop reading from pending stream " << id();
287 sequencer_.StopReading();
288 }
289
QuicStream(PendingStream * pending,QuicSession * session,bool is_static)290 QuicStream::QuicStream(PendingStream* pending, QuicSession* session,
291 bool is_static)
292 : QuicStream(
293 pending->id_, session, std::move(pending->sequencer_), is_static,
294 QuicUtils::GetStreamType(pending->id_, session->perspective(),
295 /*peer_initiated = */ true,
296 session->version()),
297 pending->stream_bytes_read_, pending->fin_received_,
298 std::move(pending->flow_controller_),
299 pending->connection_flow_controller_,
300 (session->GetClock()->ApproximateNow() - pending->creation_time())) {
301 QUICHE_DCHECK(session->version().HasIetfQuicFrames());
302 sequencer_.set_stream(this);
303 }
304
305 namespace {
306
FlowController(QuicStreamId id,QuicSession * session,StreamType type)307 std::optional<QuicFlowController> FlowController(QuicStreamId id,
308 QuicSession* session,
309 StreamType type) {
310 if (type == CRYPTO) {
311 // The only QuicStream with a StreamType of CRYPTO is QuicCryptoStream, when
312 // it is using crypto frames instead of stream frames. The QuicCryptoStream
313 // doesn't have any flow control in that case, so we don't create a
314 // QuicFlowController for it.
315 return std::nullopt;
316 }
317 return QuicFlowController(
318 session, id,
319 /*is_connection_flow_controller*/ false,
320 GetReceivedFlowControlWindow(session, id),
321 GetInitialStreamFlowControlWindowToSend(session, id),
322 kStreamReceiveWindowLimit,
323 session->flow_controller()->auto_tune_receive_window(),
324 session->flow_controller());
325 }
326
327 } // namespace
328
QuicStream(QuicStreamId id,QuicSession * session,bool is_static,StreamType type)329 QuicStream::QuicStream(QuicStreamId id, QuicSession* session, bool is_static,
330 StreamType type)
331 : QuicStream(id, session, QuicStreamSequencer(this), is_static, type, 0,
332 false, FlowController(id, session, type),
333 session->flow_controller(), QuicTime::Delta::Zero()) {}
334
QuicStream(QuicStreamId id,QuicSession * session,QuicStreamSequencer sequencer,bool is_static,StreamType type,uint64_t stream_bytes_read,bool fin_received,std::optional<QuicFlowController> flow_controller,QuicFlowController * connection_flow_controller,QuicTime::Delta pending_duration)335 QuicStream::QuicStream(QuicStreamId id, QuicSession* session,
336 QuicStreamSequencer sequencer, bool is_static,
337 StreamType type, uint64_t stream_bytes_read,
338 bool fin_received,
339 std::optional<QuicFlowController> flow_controller,
340 QuicFlowController* connection_flow_controller,
341 QuicTime::Delta pending_duration)
342 : sequencer_(std::move(sequencer)),
343 id_(id),
344 session_(session),
345 stream_delegate_(session),
346 priority_(QuicStreamPriority::Default(session->priority_type())),
347 stream_bytes_read_(stream_bytes_read),
348 stream_error_(QuicResetStreamError::NoError()),
349 connection_error_(QUIC_NO_ERROR),
350 read_side_closed_(false),
351 write_side_closed_(false),
352 write_side_data_recvd_state_notified_(false),
353 fin_buffered_(false),
354 fin_sent_(false),
355 fin_outstanding_(false),
356 fin_lost_(false),
357 fin_received_(fin_received),
358 rst_sent_(false),
359 rst_received_(false),
360 stop_sending_sent_(false),
361 flow_controller_(std::move(flow_controller)),
362 connection_flow_controller_(connection_flow_controller),
363 stream_contributes_to_connection_flow_control_(true),
364 busy_counter_(0),
365 add_random_padding_after_fin_(false),
366 send_buffer_(
367 session->connection()->helper()->GetStreamSendBufferAllocator()),
368 buffered_data_threshold_(GetQuicFlag(quic_buffered_data_threshold)),
369 is_static_(is_static),
370 deadline_(QuicTime::Zero()),
371 was_draining_(false),
372 type_(VersionHasIetfQuicFrames(session->transport_version()) &&
373 type != CRYPTO
374 ? QuicUtils::GetStreamType(id_, session->perspective(),
375 session->IsIncomingStream(id_),
376 session->version())
377 : type),
378 creation_time_(session->connection()->clock()->ApproximateNow()),
379 pending_duration_(pending_duration),
380 perspective_(session->perspective()) {
381 if (type_ == WRITE_UNIDIRECTIONAL) {
382 fin_received_ = true;
383 CloseReadSide();
384 } else if (type_ == READ_UNIDIRECTIONAL) {
385 fin_sent_ = true;
386 CloseWriteSide();
387 }
388 if (type_ != CRYPTO) {
389 stream_delegate_->RegisterStreamPriority(id, is_static_, priority_);
390 }
391 }
392
~QuicStream()393 QuicStream::~QuicStream() {
394 if (session_ != nullptr && IsWaitingForAcks()) {
395 QUIC_DVLOG(1)
396 << ENDPOINT << "Stream " << id_
397 << " gets destroyed while waiting for acks. stream_bytes_outstanding = "
398 << send_buffer_.stream_bytes_outstanding()
399 << ", fin_outstanding: " << fin_outstanding_;
400 }
401 if (stream_delegate_ != nullptr && type_ != CRYPTO) {
402 stream_delegate_->UnregisterStreamPriority(id());
403 }
404 }
405
OnStreamFrame(const QuicStreamFrame & frame)406 void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
407 QUICHE_DCHECK_EQ(frame.stream_id, id_);
408
409 QUICHE_DCHECK(!(read_side_closed_ && write_side_closed_));
410
411 if (frame.fin && is_static_) {
412 OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
413 "Attempt to close a static stream");
414 return;
415 }
416
417 if (type_ == WRITE_UNIDIRECTIONAL) {
418 OnUnrecoverableError(QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM,
419 "Data received on write unidirectional stream");
420 return;
421 }
422
423 bool is_stream_too_long =
424 (frame.offset > kMaxStreamLength) ||
425 (kMaxStreamLength - frame.offset < frame.data_length);
426 if (is_stream_too_long) {
427 // Close connection if stream becomes too long.
428 QUIC_PEER_BUG(quic_peer_bug_10586_1)
429 << "Receive stream frame on stream " << id_
430 << " reaches max stream length. frame offset " << frame.offset
431 << " length " << frame.data_length << ". " << sequencer_.DebugString();
432 OnUnrecoverableError(
433 QUIC_STREAM_LENGTH_OVERFLOW,
434 absl::StrCat("Peer sends more data than allowed on stream ", id_,
435 ". frame: offset = ", frame.offset, ", length = ",
436 frame.data_length, ". ", sequencer_.DebugString()));
437 return;
438 }
439
440 if (frame.offset + frame.data_length > sequencer_.close_offset()) {
441 OnUnrecoverableError(
442 QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
443 absl::StrCat(
444 "Stream ", id_,
445 " received data with offset: ", frame.offset + frame.data_length,
446 ", which is beyond close offset: ", sequencer_.close_offset()));
447 return;
448 }
449
450 if (frame.fin && !fin_received_) {
451 fin_received_ = true;
452 if (fin_sent_) {
453 QUICHE_DCHECK(!was_draining_);
454 session_->StreamDraining(id_,
455 /*unidirectional=*/type_ != BIDIRECTIONAL);
456 was_draining_ = true;
457 }
458 }
459
460 if (read_side_closed_) {
461 QUIC_DLOG(INFO)
462 << ENDPOINT << "Stream " << frame.stream_id
463 << " is closed for reading. Ignoring newly received stream data.";
464 // The subclass does not want to read data: blackhole the data.
465 return;
466 }
467
468 // This count includes duplicate data received.
469 QuicByteCount frame_payload_size = frame.data_length;
470 stream_bytes_read_ += frame_payload_size;
471
472 // Flow control is interested in tracking highest received offset.
473 // Only interested in received frames that carry data.
474 if (frame_payload_size > 0 &&
475 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
476 // As the highest received offset has changed, check to see if this is a
477 // violation of flow control.
478 QUIC_BUG_IF(quic_bug_12570_2, !flow_controller_.has_value())
479 << ENDPOINT << "OnStreamFrame called on stream without flow control";
480 if ((flow_controller_.has_value() &&
481 flow_controller_->FlowControlViolation()) ||
482 connection_flow_controller_->FlowControlViolation()) {
483 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
484 "Flow control violation after increasing offset");
485 return;
486 }
487 }
488
489 sequencer_.OnStreamFrame(frame);
490 }
491
OnStopSending(QuicResetStreamError error)492 bool QuicStream::OnStopSending(QuicResetStreamError error) {
493 // Do not reset the stream if all data has been sent and acknowledged.
494 if (write_side_closed() && !IsWaitingForAcks()) {
495 QUIC_DVLOG(1) << ENDPOINT
496 << "Ignoring STOP_SENDING for a write closed stream, id: "
497 << id_;
498 return false;
499 }
500
501 if (is_static_) {
502 QUIC_DVLOG(1) << ENDPOINT
503 << "Received STOP_SENDING for a static stream, id: " << id_
504 << " Closing connection";
505 OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
506 "Received STOP_SENDING for a static stream");
507 return false;
508 }
509
510 stream_error_ = error;
511 MaybeSendRstStream(error);
512 return true;
513 }
514
num_frames_received() const515 int QuicStream::num_frames_received() const {
516 return sequencer_.num_frames_received();
517 }
518
num_duplicate_frames_received() const519 int QuicStream::num_duplicate_frames_received() const {
520 return sequencer_.num_duplicate_frames_received();
521 }
522
OnStreamReset(const QuicRstStreamFrame & frame)523 void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
524 rst_received_ = true;
525 if (frame.byte_offset > kMaxStreamLength) {
526 // Peer are not suppose to write bytes more than maxium allowed.
527 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
528 "Reset frame stream offset overflow.");
529 return;
530 }
531
532 const QuicStreamOffset kMaxOffset =
533 std::numeric_limits<QuicStreamOffset>::max();
534 if (sequencer()->close_offset() != kMaxOffset &&
535 frame.byte_offset != sequencer()->close_offset()) {
536 OnUnrecoverableError(
537 QUIC_STREAM_MULTIPLE_OFFSET,
538 absl::StrCat("Stream ", id_,
539 " received new final offset: ", frame.byte_offset,
540 ", which is different from close offset: ",
541 sequencer_.close_offset()));
542 return;
543 }
544
545 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
546 QUIC_BUG_IF(quic_bug_12570_3, !flow_controller_.has_value())
547 << ENDPOINT << "OnStreamReset called on stream without flow control";
548 if ((flow_controller_.has_value() &&
549 flow_controller_->FlowControlViolation()) ||
550 connection_flow_controller_->FlowControlViolation()) {
551 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
552 "Flow control violation after increasing offset");
553 return;
554 }
555
556 stream_error_ = frame.error();
557 // Google QUIC closes both sides of the stream in response to a
558 // RESET_STREAM, IETF QUIC closes only the read side.
559 if (!VersionHasIetfQuicFrames(transport_version())) {
560 CloseWriteSide();
561 }
562 CloseReadSide();
563 }
564
OnConnectionClosed(QuicErrorCode error,ConnectionCloseSource)565 void QuicStream::OnConnectionClosed(QuicErrorCode error,
566 ConnectionCloseSource /*source*/) {
567 if (read_side_closed_ && write_side_closed_) {
568 return;
569 }
570 if (error != QUIC_NO_ERROR) {
571 stream_error_ =
572 QuicResetStreamError::FromInternal(QUIC_STREAM_CONNECTION_ERROR);
573 connection_error_ = error;
574 }
575
576 CloseWriteSide();
577 CloseReadSide();
578 }
579
OnFinRead()580 void QuicStream::OnFinRead() {
581 QUICHE_DCHECK(sequencer_.IsClosed());
582 // OnFinRead can be called due to a FIN flag in a headers block, so there may
583 // have been no OnStreamFrame call with a FIN in the frame.
584 fin_received_ = true;
585 // If fin_sent_ is true, then CloseWriteSide has already been called, and the
586 // stream will be destroyed by CloseReadSide, so don't need to call
587 // StreamDraining.
588 CloseReadSide();
589 }
590
SetFinSent()591 void QuicStream::SetFinSent() {
592 QUICHE_DCHECK(!VersionUsesHttp3(transport_version()));
593 fin_sent_ = true;
594 }
595
Reset(QuicRstStreamErrorCode error)596 void QuicStream::Reset(QuicRstStreamErrorCode error) {
597 ResetWithError(QuicResetStreamError::FromInternal(error));
598 }
599
ResetWithError(QuicResetStreamError error)600 void QuicStream::ResetWithError(QuicResetStreamError error) {
601 stream_error_ = error;
602 QuicConnection::ScopedPacketFlusher flusher(session()->connection());
603 MaybeSendStopSending(error);
604 MaybeSendRstStream(error);
605
606 if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
607 session()->MaybeCloseZombieStream(id_);
608 }
609 }
610
ResetWriteSide(QuicResetStreamError error)611 void QuicStream::ResetWriteSide(QuicResetStreamError error) {
612 stream_error_ = error;
613 MaybeSendRstStream(error);
614
615 if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
616 session()->MaybeCloseZombieStream(id_);
617 }
618 }
619
SendStopSending(QuicResetStreamError error)620 void QuicStream::SendStopSending(QuicResetStreamError error) {
621 stream_error_ = error;
622 MaybeSendStopSending(error);
623
624 if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
625 session()->MaybeCloseZombieStream(id_);
626 }
627 }
628
OnUnrecoverableError(QuicErrorCode error,const std::string & details)629 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
630 const std::string& details) {
631 stream_delegate_->OnStreamError(error, details);
632 }
633
OnUnrecoverableError(QuicErrorCode error,QuicIetfTransportErrorCodes ietf_error,const std::string & details)634 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
635 QuicIetfTransportErrorCodes ietf_error,
636 const std::string& details) {
637 stream_delegate_->OnStreamError(error, ietf_error, details);
638 }
639
priority() const640 const QuicStreamPriority& QuicStream::priority() const { return priority_; }
641
SetPriority(const QuicStreamPriority & priority)642 void QuicStream::SetPriority(const QuicStreamPriority& priority) {
643 priority_ = priority;
644
645 MaybeSendPriorityUpdateFrame();
646
647 stream_delegate_->UpdateStreamPriority(id(), priority);
648 }
649
WriteOrBufferData(absl::string_view data,bool fin,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)650 void QuicStream::WriteOrBufferData(
651 absl::string_view data, bool fin,
652 quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
653 ack_listener) {
654 QUIC_BUG_IF(quic_bug_12570_4,
655 QuicUtils::IsCryptoStreamId(transport_version(), id_))
656 << ENDPOINT
657 << "WriteOrBufferData is used to send application data, use "
658 "WriteOrBufferDataAtLevel to send crypto data.";
659 return WriteOrBufferDataAtLevel(
660 data, fin, session()->GetEncryptionLevelToSendApplicationData(),
661 ack_listener);
662 }
663
WriteOrBufferDataAtLevel(absl::string_view data,bool fin,EncryptionLevel level,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)664 void QuicStream::WriteOrBufferDataAtLevel(
665 absl::string_view data, bool fin, EncryptionLevel level,
666 quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
667 ack_listener) {
668 if (data.empty() && !fin) {
669 QUIC_BUG(quic_bug_10586_2) << "data.empty() && !fin";
670 return;
671 }
672
673 if (fin_buffered_) {
674 QUIC_BUG(quic_bug_10586_3) << "Fin already buffered";
675 return;
676 }
677 if (write_side_closed_) {
678 QUIC_DLOG(ERROR) << ENDPOINT
679 << "Attempt to write when the write side is closed";
680 if (type_ == READ_UNIDIRECTIONAL) {
681 OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
682 "Try to send data on read unidirectional stream");
683 }
684 return;
685 }
686
687 fin_buffered_ = fin;
688
689 bool had_buffered_data = HasBufferedData();
690 // Do not respect buffered data upper limit as WriteOrBufferData guarantees
691 // all data to be consumed.
692 if (data.length() > 0) {
693 QuicStreamOffset offset = send_buffer_.stream_offset();
694 if (kMaxStreamLength - offset < data.length()) {
695 QUIC_BUG(quic_bug_10586_4) << "Write too many data via stream " << id_;
696 OnUnrecoverableError(
697 QUIC_STREAM_LENGTH_OVERFLOW,
698 absl::StrCat("Write too many data via stream ", id_));
699 return;
700 }
701 send_buffer_.SaveStreamData(data);
702 OnDataBuffered(offset, data.length(), ack_listener);
703 }
704 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
705 // Write data if there is no buffered data before.
706 WriteBufferedData(level);
707 }
708 }
709
OnCanWrite()710 void QuicStream::OnCanWrite() {
711 if (HasDeadlinePassed()) {
712 OnDeadlinePassed();
713 return;
714 }
715 if (HasPendingRetransmission()) {
716 WritePendingRetransmission();
717 // Exit early to allow other streams to write pending retransmissions if
718 // any.
719 return;
720 }
721
722 if (write_side_closed_) {
723 QUIC_DLOG(ERROR)
724 << ENDPOINT << "Stream " << id()
725 << " attempting to write new data when the write side is closed";
726 return;
727 }
728 if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
729 WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData());
730 }
731 if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
732 // Notify upper layer to write new data when buffered data size is below
733 // low water mark.
734 OnCanWriteNewData();
735 }
736 }
737
MaybeSendBlocked()738 void QuicStream::MaybeSendBlocked() {
739 if (!flow_controller_.has_value()) {
740 QUIC_BUG(quic_bug_10586_5)
741 << ENDPOINT << "MaybeSendBlocked called on stream without flow control";
742 return;
743 }
744 flow_controller_->MaybeSendBlocked();
745 if (!stream_contributes_to_connection_flow_control_) {
746 return;
747 }
748 connection_flow_controller_->MaybeSendBlocked();
749
750 // If the stream is blocked by connection-level flow control but not by
751 // stream-level flow control, add the stream to the write blocked list so that
752 // the stream will be given a chance to write when a connection-level
753 // WINDOW_UPDATE arrives.
754 if (!write_side_closed_ && connection_flow_controller_->IsBlocked() &&
755 !flow_controller_->IsBlocked()) {
756 session_->MarkConnectionLevelWriteBlocked(id());
757 }
758 }
759
WriteMemSlice(quiche::QuicheMemSlice span,bool fin)760 QuicConsumedData QuicStream::WriteMemSlice(quiche::QuicheMemSlice span,
761 bool fin) {
762 return WriteMemSlices(absl::MakeSpan(&span, 1), fin);
763 }
764
WriteMemSlices(absl::Span<quiche::QuicheMemSlice> span,bool fin,bool buffer_unconditionally)765 QuicConsumedData QuicStream::WriteMemSlices(
766 absl::Span<quiche::QuicheMemSlice> span, bool fin,
767 bool buffer_unconditionally) {
768 QuicConsumedData consumed_data(0, false);
769 if (span.empty() && !fin) {
770 QUIC_BUG(quic_bug_10586_6) << "span.empty() && !fin";
771 return consumed_data;
772 }
773
774 if (fin_buffered_) {
775 QUIC_BUG(quic_bug_10586_7) << "Fin already buffered";
776 return consumed_data;
777 }
778
779 if (write_side_closed_) {
780 QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
781 << " attempting to write when the write side is closed";
782 if (type_ == READ_UNIDIRECTIONAL) {
783 OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
784 "Try to send data on read unidirectional stream");
785 }
786 return consumed_data;
787 }
788
789 bool had_buffered_data = HasBufferedData();
790 if (CanWriteNewData() || span.empty() || buffer_unconditionally) {
791 consumed_data.fin_consumed = fin;
792 if (!span.empty()) {
793 // Buffer all data if buffered data size is below limit.
794 QuicStreamOffset offset = send_buffer_.stream_offset();
795 consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span);
796 if (offset > send_buffer_.stream_offset() ||
797 kMaxStreamLength < send_buffer_.stream_offset()) {
798 QUIC_BUG(quic_bug_10586_8) << "Write too many data via stream " << id_;
799 OnUnrecoverableError(
800 QUIC_STREAM_LENGTH_OVERFLOW,
801 absl::StrCat("Write too many data via stream ", id_));
802 return consumed_data;
803 }
804 OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr);
805 }
806 }
807 fin_buffered_ = consumed_data.fin_consumed;
808
809 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
810 // Write data if there is no buffered data before.
811 WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData());
812 }
813
814 return consumed_data;
815 }
816
HasPendingRetransmission() const817 bool QuicStream::HasPendingRetransmission() const {
818 return send_buffer_.HasPendingRetransmission() || fin_lost_;
819 }
820
IsStreamFrameOutstanding(QuicStreamOffset offset,QuicByteCount data_length,bool fin) const821 bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
822 QuicByteCount data_length,
823 bool fin) const {
824 return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
825 (fin && fin_outstanding_);
826 }
827
CloseReadSide()828 void QuicStream::CloseReadSide() {
829 if (read_side_closed_) {
830 return;
831 }
832 QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
833
834 read_side_closed_ = true;
835 sequencer_.ReleaseBuffer();
836
837 if (write_side_closed_) {
838 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
839 session_->OnStreamClosed(id());
840 OnClose();
841 }
842 }
843
CloseWriteSide()844 void QuicStream::CloseWriteSide() {
845 if (write_side_closed_) {
846 return;
847 }
848 QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
849
850 write_side_closed_ = true;
851 if (read_side_closed_) {
852 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
853 session_->OnStreamClosed(id());
854 OnClose();
855 }
856 }
857
MaybeSendStopSending(QuicResetStreamError error)858 void QuicStream::MaybeSendStopSending(QuicResetStreamError error) {
859 if (stop_sending_sent_) {
860 return;
861 }
862
863 if (!session()->version().UsesHttp3() && !error.ok()) {
864 // In gQUIC, RST with error closes both read and write side.
865 return;
866 }
867
868 if (session()->version().UsesHttp3()) {
869 session()->MaybeSendStopSendingFrame(id(), error);
870 } else {
871 QUICHE_DCHECK_EQ(QUIC_STREAM_NO_ERROR, error.internal_code());
872 session()->MaybeSendRstStreamFrame(id(), QuicResetStreamError::NoError(),
873 stream_bytes_written());
874 }
875 stop_sending_sent_ = true;
876 CloseReadSide();
877 }
878
MaybeSendRstStream(QuicResetStreamError error)879 void QuicStream::MaybeSendRstStream(QuicResetStreamError error) {
880 if (rst_sent_) {
881 return;
882 }
883
884 if (!session()->version().UsesHttp3()) {
885 QUIC_BUG_IF(quic_bug_12570_5, error.ok());
886 stop_sending_sent_ = true;
887 CloseReadSide();
888 }
889 session()->MaybeSendRstStreamFrame(id(), error, stream_bytes_written());
890 rst_sent_ = true;
891 CloseWriteSide();
892 }
893
HasBufferedData() const894 bool QuicStream::HasBufferedData() const {
895 QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
896 return send_buffer_.stream_offset() > stream_bytes_written();
897 }
898
version() const899 ParsedQuicVersion QuicStream::version() const { return session_->version(); }
900
transport_version() const901 QuicTransportVersion QuicStream::transport_version() const {
902 return session_->transport_version();
903 }
904
handshake_protocol() const905 HandshakeProtocol QuicStream::handshake_protocol() const {
906 return session_->connection()->version().handshake_protocol;
907 }
908
StopReading()909 void QuicStream::StopReading() {
910 QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
911 sequencer_.StopReading();
912 }
913
OnClose()914 void QuicStream::OnClose() {
915 QUICHE_DCHECK(read_side_closed_ && write_side_closed_);
916
917 if (!fin_sent_ && !rst_sent_) {
918 QUIC_BUG_IF(quic_bug_12570_6, session()->connection()->connected() &&
919 session()->version().UsesHttp3())
920 << "The stream should've already sent RST in response to "
921 "STOP_SENDING";
922 // For flow control accounting, tell the peer how many bytes have been
923 // written on this stream before termination. Done here if needed, using a
924 // RST_STREAM frame.
925 MaybeSendRstStream(QUIC_RST_ACKNOWLEDGEMENT);
926 session_->MaybeCloseZombieStream(id_);
927 }
928
929 if (!flow_controller_.has_value() ||
930 flow_controller_->FlowControlViolation() ||
931 connection_flow_controller_->FlowControlViolation()) {
932 return;
933 }
934 // The stream is being closed and will not process any further incoming bytes.
935 // As there may be more bytes in flight, to ensure that both endpoints have
936 // the same connection level flow control state, mark all unreceived or
937 // buffered bytes as consumed.
938 QuicByteCount bytes_to_consume =
939 flow_controller_->highest_received_byte_offset() -
940 flow_controller_->bytes_consumed();
941 AddBytesConsumed(bytes_to_consume);
942 }
943
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)944 void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
945 if (type_ == READ_UNIDIRECTIONAL) {
946 OnUnrecoverableError(
947 QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
948 "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.");
949 return;
950 }
951
952 if (!flow_controller_.has_value()) {
953 QUIC_BUG(quic_bug_10586_9)
954 << ENDPOINT
955 << "OnWindowUpdateFrame called on stream without flow control";
956 return;
957 }
958
959 if (flow_controller_->UpdateSendWindowOffset(frame.max_data)) {
960 // Let session unblock this stream.
961 session_->MarkConnectionLevelWriteBlocked(id_);
962 }
963 }
964
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)965 bool QuicStream::MaybeIncreaseHighestReceivedOffset(
966 QuicStreamOffset new_offset) {
967 if (!flow_controller_.has_value()) {
968 QUIC_BUG(quic_bug_10586_10)
969 << ENDPOINT
970 << "MaybeIncreaseHighestReceivedOffset called on stream without "
971 "flow control";
972 return false;
973 }
974 uint64_t increment =
975 new_offset - flow_controller_->highest_received_byte_offset();
976 if (!flow_controller_->UpdateHighestReceivedOffset(new_offset)) {
977 return false;
978 }
979
980 // If |new_offset| increased the stream flow controller's highest received
981 // offset, increase the connection flow controller's value by the incremental
982 // difference.
983 if (stream_contributes_to_connection_flow_control_) {
984 connection_flow_controller_->UpdateHighestReceivedOffset(
985 connection_flow_controller_->highest_received_byte_offset() +
986 increment);
987 }
988 return true;
989 }
990
AddBytesSent(QuicByteCount bytes)991 void QuicStream::AddBytesSent(QuicByteCount bytes) {
992 if (!flow_controller_.has_value()) {
993 QUIC_BUG(quic_bug_10586_11)
994 << ENDPOINT << "AddBytesSent called on stream without flow control";
995 return;
996 }
997 flow_controller_->AddBytesSent(bytes);
998 if (stream_contributes_to_connection_flow_control_) {
999 connection_flow_controller_->AddBytesSent(bytes);
1000 }
1001 }
1002
AddBytesConsumed(QuicByteCount bytes)1003 void QuicStream::AddBytesConsumed(QuicByteCount bytes) {
1004 if (type_ == CRYPTO) {
1005 // A stream with type CRYPTO has no flow control, so there's nothing this
1006 // function needs to do. This function still gets called by the
1007 // QuicStreamSequencers used by QuicCryptoStream.
1008 return;
1009 }
1010 if (!flow_controller_.has_value()) {
1011 QUIC_BUG(quic_bug_12570_7)
1012 << ENDPOINT
1013 << "AddBytesConsumed called on non-crypto stream without flow control";
1014 return;
1015 }
1016 // Only adjust stream level flow controller if still reading.
1017 if (!read_side_closed_) {
1018 flow_controller_->AddBytesConsumed(bytes);
1019 }
1020
1021 if (stream_contributes_to_connection_flow_control_) {
1022 connection_flow_controller_->AddBytesConsumed(bytes);
1023 }
1024 }
1025
MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,bool was_zero_rtt_rejected)1026 bool QuicStream::MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,
1027 bool was_zero_rtt_rejected) {
1028 if (!flow_controller_.has_value()) {
1029 QUIC_BUG(quic_bug_10586_12)
1030 << ENDPOINT
1031 << "ConfigSendWindowOffset called on stream without flow control";
1032 return false;
1033 }
1034
1035 // The validation code below is for QUIC with TLS only.
1036 if (new_offset < flow_controller_->send_window_offset()) {
1037 QUICHE_DCHECK(session()->version().UsesTls());
1038 if (was_zero_rtt_rejected && new_offset < flow_controller_->bytes_sent()) {
1039 // The client is given flow control window lower than what's written in
1040 // 0-RTT. This QUIC implementation is unable to retransmit them.
1041 QUIC_BUG_IF(quic_bug_12570_8, perspective_ == Perspective::IS_SERVER)
1042 << "Server streams' flow control should never be configured twice.";
1043 OnUnrecoverableError(
1044 QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1045 absl::StrCat(
1046 "Server rejected 0-RTT, aborting because new stream max data ",
1047 new_offset, " for stream ", id_, " is less than currently used: ",
1048 flow_controller_->bytes_sent()));
1049 return false;
1050 } else if (session()->version().AllowsLowFlowControlLimits()) {
1051 // In IETF QUIC, if the client receives flow control limit lower than what
1052 // was resumed from 0-RTT, depending on 0-RTT status, it's either the
1053 // peer's fault or our implementation's fault.
1054 QUIC_BUG_IF(quic_bug_12570_9, perspective_ == Perspective::IS_SERVER)
1055 << "Server streams' flow control should never be configured twice.";
1056 OnUnrecoverableError(
1057 was_zero_rtt_rejected ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1058 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1059 absl::StrCat(
1060 was_zero_rtt_rejected ? "Server rejected 0-RTT, aborting because "
1061 : "",
1062 "new stream max data ", new_offset, " decreases current limit: ",
1063 flow_controller_->send_window_offset()));
1064 return false;
1065 }
1066 }
1067
1068 if (flow_controller_->UpdateSendWindowOffset(new_offset)) {
1069 // Let session unblock this stream.
1070 session_->MarkConnectionLevelWriteBlocked(id_);
1071 }
1072 return true;
1073 }
1074
AddRandomPaddingAfterFin()1075 void QuicStream::AddRandomPaddingAfterFin() {
1076 add_random_padding_after_fin_ = true;
1077 }
1078
OnStreamFrameAcked(QuicStreamOffset offset,QuicByteCount data_length,bool fin_acked,QuicTime::Delta,QuicTime,QuicByteCount * newly_acked_length)1079 bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
1080 QuicByteCount data_length, bool fin_acked,
1081 QuicTime::Delta /*ack_delay_time*/,
1082 QuicTime /*receive_timestamp*/,
1083 QuicByteCount* newly_acked_length) {
1084 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
1085 << "[" << offset << ", " << offset + data_length << "]"
1086 << " fin = " << fin_acked;
1087 *newly_acked_length = 0;
1088 if (!send_buffer_.OnStreamDataAcked(offset, data_length,
1089 newly_acked_length)) {
1090 OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent data.");
1091 return false;
1092 }
1093 if (!fin_sent_ && fin_acked) {
1094 OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent fin.");
1095 return false;
1096 }
1097 // Indicates whether ack listener's OnPacketAcked should be called.
1098 const bool new_data_acked =
1099 *newly_acked_length > 0 || (fin_acked && fin_outstanding_);
1100 if (fin_acked) {
1101 fin_outstanding_ = false;
1102 fin_lost_ = false;
1103 }
1104 if (!IsWaitingForAcks() && write_side_closed_ &&
1105 !write_side_data_recvd_state_notified_) {
1106 OnWriteSideInDataRecvdState();
1107 write_side_data_recvd_state_notified_ = true;
1108 }
1109 if (!IsWaitingForAcks() && read_side_closed_ && write_side_closed_) {
1110 session_->MaybeCloseZombieStream(id_);
1111 }
1112 return new_data_acked;
1113 }
1114
OnStreamFrameRetransmitted(QuicStreamOffset offset,QuicByteCount data_length,bool fin_retransmitted)1115 void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
1116 QuicByteCount data_length,
1117 bool fin_retransmitted) {
1118 send_buffer_.OnStreamDataRetransmitted(offset, data_length);
1119 if (fin_retransmitted) {
1120 fin_lost_ = false;
1121 }
1122 }
1123
OnStreamFrameLost(QuicStreamOffset offset,QuicByteCount data_length,bool fin_lost)1124 void QuicStream::OnStreamFrameLost(QuicStreamOffset offset,
1125 QuicByteCount data_length, bool fin_lost) {
1126 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting "
1127 << "[" << offset << ", " << offset + data_length << "]"
1128 << " fin = " << fin_lost;
1129 if (data_length > 0) {
1130 send_buffer_.OnStreamDataLost(offset, data_length);
1131 }
1132 if (fin_lost && fin_outstanding_) {
1133 fin_lost_ = true;
1134 }
1135 }
1136
RetransmitStreamData(QuicStreamOffset offset,QuicByteCount data_length,bool fin,TransmissionType type)1137 bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
1138 QuicByteCount data_length, bool fin,
1139 TransmissionType type) {
1140 QUICHE_DCHECK(type == PTO_RETRANSMISSION);
1141 if (HasDeadlinePassed()) {
1142 OnDeadlinePassed();
1143 return true;
1144 }
1145 QuicIntervalSet<QuicStreamOffset> retransmission(offset,
1146 offset + data_length);
1147 retransmission.Difference(bytes_acked());
1148 bool retransmit_fin = fin && fin_outstanding_;
1149 if (retransmission.Empty() && !retransmit_fin) {
1150 return true;
1151 }
1152 QuicConsumedData consumed(0, false);
1153 for (const auto& interval : retransmission) {
1154 QuicStreamOffset retransmission_offset = interval.min();
1155 QuicByteCount retransmission_length = interval.max() - interval.min();
1156 const bool can_bundle_fin =
1157 retransmit_fin && (retransmission_offset + retransmission_length ==
1158 stream_bytes_written());
1159 consumed = stream_delegate_->WritevData(
1160 id_, retransmission_length, retransmission_offset,
1161 can_bundle_fin ? FIN : NO_FIN, type,
1162 session()->GetEncryptionLevelToSendApplicationData());
1163 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1164 << " is forced to retransmit stream data ["
1165 << retransmission_offset << ", "
1166 << retransmission_offset + retransmission_length
1167 << ") and fin: " << can_bundle_fin
1168 << ", consumed: " << consumed;
1169 OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed,
1170 consumed.fin_consumed);
1171 if (can_bundle_fin) {
1172 retransmit_fin = !consumed.fin_consumed;
1173 }
1174 if (consumed.bytes_consumed < retransmission_length ||
1175 (can_bundle_fin && !consumed.fin_consumed)) {
1176 // Connection is write blocked.
1177 return false;
1178 }
1179 }
1180 if (retransmit_fin) {
1181 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1182 << " retransmits fin only frame.";
1183 consumed = stream_delegate_->WritevData(
1184 id_, 0, stream_bytes_written(), FIN, type,
1185 session()->GetEncryptionLevelToSendApplicationData());
1186 if (!consumed.fin_consumed) {
1187 return false;
1188 }
1189 }
1190 return true;
1191 }
1192
IsWaitingForAcks() const1193 bool QuicStream::IsWaitingForAcks() const {
1194 return (!rst_sent_ || stream_error_.ok()) &&
1195 (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
1196 }
1197
WriteStreamData(QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)1198 bool QuicStream::WriteStreamData(QuicStreamOffset offset,
1199 QuicByteCount data_length,
1200 QuicDataWriter* writer) {
1201 QUICHE_DCHECK_LT(0u, data_length);
1202 QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
1203 << offset << " length " << data_length;
1204 return send_buffer_.WriteStreamData(offset, data_length, writer);
1205 }
1206
WriteBufferedData(EncryptionLevel level)1207 void QuicStream::WriteBufferedData(EncryptionLevel level) {
1208 QUICHE_DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_));
1209
1210 if (session_->ShouldYield(id())) {
1211 session_->MarkConnectionLevelWriteBlocked(id());
1212 return;
1213 }
1214
1215 // Size of buffered data.
1216 QuicByteCount write_length = BufferedDataBytes();
1217
1218 // A FIN with zero data payload should not be flow control blocked.
1219 bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
1220
1221 bool fin = fin_buffered_;
1222
1223 QUIC_BUG_IF(quic_bug_10586_13, !flow_controller_.has_value())
1224 << ENDPOINT << "WriteBufferedData called on stream without flow control";
1225
1226 // How much data flow control permits to be written.
1227 QuicByteCount send_window = CalculateSendWindowSize();
1228
1229 if (send_window == 0 && !fin_with_zero_data) {
1230 // Quick return if nothing can be sent.
1231 MaybeSendBlocked();
1232 return;
1233 }
1234
1235 if (write_length > send_window) {
1236 // Don't send the FIN unless all the data will be sent.
1237 fin = false;
1238
1239 // Writing more data would be a violation of flow control.
1240 write_length = send_window;
1241 QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
1242 << write_length << " due to flow control";
1243 }
1244
1245 StreamSendingState state = fin ? FIN : NO_FIN;
1246 if (fin && add_random_padding_after_fin_) {
1247 state = FIN_AND_PADDING;
1248 }
1249 QuicConsumedData consumed_data =
1250 stream_delegate_->WritevData(id(), write_length, stream_bytes_written(),
1251 state, NOT_RETRANSMISSION, level);
1252
1253 OnStreamDataConsumed(consumed_data.bytes_consumed);
1254
1255 AddBytesSent(consumed_data.bytes_consumed);
1256 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
1257 << stream_bytes_written() << " bytes "
1258 << " and has buffered data " << BufferedDataBytes() << " bytes."
1259 << " fin is sent: " << consumed_data.fin_consumed
1260 << " fin is buffered: " << fin_buffered_;
1261
1262 // The write may have generated a write error causing this stream to be
1263 // closed. If so, simply return without marking the stream write blocked.
1264 if (write_side_closed_) {
1265 return;
1266 }
1267
1268 if (consumed_data.bytes_consumed == write_length) {
1269 if (!fin_with_zero_data) {
1270 MaybeSendBlocked();
1271 }
1272 if (fin && consumed_data.fin_consumed) {
1273 QUICHE_DCHECK(!fin_sent_);
1274 fin_sent_ = true;
1275 fin_outstanding_ = true;
1276 if (fin_received_) {
1277 QUICHE_DCHECK(!was_draining_);
1278 session_->StreamDraining(id_,
1279 /*unidirectional=*/type_ != BIDIRECTIONAL);
1280 was_draining_ = true;
1281 }
1282 CloseWriteSide();
1283 } else if (fin && !consumed_data.fin_consumed && !write_side_closed_) {
1284 session_->MarkConnectionLevelWriteBlocked(id());
1285 }
1286 } else {
1287 session_->MarkConnectionLevelWriteBlocked(id());
1288 }
1289 if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
1290 busy_counter_ = 0;
1291 }
1292 }
1293
BufferedDataBytes() const1294 uint64_t QuicStream::BufferedDataBytes() const {
1295 QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
1296 return send_buffer_.stream_offset() - stream_bytes_written();
1297 }
1298
CanWriteNewData() const1299 bool QuicStream::CanWriteNewData() const {
1300 return BufferedDataBytes() < buffered_data_threshold_;
1301 }
1302
CanWriteNewDataAfterData(QuicByteCount length) const1303 bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const {
1304 return (BufferedDataBytes() + length) < buffered_data_threshold_;
1305 }
1306
stream_bytes_written() const1307 uint64_t QuicStream::stream_bytes_written() const {
1308 return send_buffer_.stream_bytes_written();
1309 }
1310
bytes_acked() const1311 const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
1312 return send_buffer_.bytes_acked();
1313 }
1314
OnStreamDataConsumed(QuicByteCount bytes_consumed)1315 void QuicStream::OnStreamDataConsumed(QuicByteCount bytes_consumed) {
1316 send_buffer_.OnStreamDataConsumed(bytes_consumed);
1317 }
1318
WritePendingRetransmission()1319 void QuicStream::WritePendingRetransmission() {
1320 while (HasPendingRetransmission()) {
1321 QuicConsumedData consumed(0, false);
1322 if (!send_buffer_.HasPendingRetransmission()) {
1323 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1324 << " retransmits fin only frame.";
1325 consumed = stream_delegate_->WritevData(
1326 id_, 0, stream_bytes_written(), FIN, LOSS_RETRANSMISSION,
1327 session()->GetEncryptionLevelToSendApplicationData());
1328 fin_lost_ = !consumed.fin_consumed;
1329 if (fin_lost_) {
1330 // Connection is write blocked.
1331 return;
1332 }
1333 } else {
1334 StreamPendingRetransmission pending =
1335 send_buffer_.NextPendingRetransmission();
1336 // Determine whether the lost fin can be bundled with the data.
1337 const bool can_bundle_fin =
1338 fin_lost_ &&
1339 (pending.offset + pending.length == stream_bytes_written());
1340 consumed = stream_delegate_->WritevData(
1341 id_, pending.length, pending.offset, can_bundle_fin ? FIN : NO_FIN,
1342 LOSS_RETRANSMISSION,
1343 session()->GetEncryptionLevelToSendApplicationData());
1344 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1345 << " tries to retransmit stream data [" << pending.offset
1346 << ", " << pending.offset + pending.length
1347 << ") and fin: " << can_bundle_fin
1348 << ", consumed: " << consumed;
1349 OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed,
1350 consumed.fin_consumed);
1351 if (consumed.bytes_consumed < pending.length ||
1352 (can_bundle_fin && !consumed.fin_consumed)) {
1353 // Connection is write blocked.
1354 return;
1355 }
1356 }
1357 }
1358 }
1359
MaybeSetTtl(QuicTime::Delta ttl)1360 bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) {
1361 if (is_static_) {
1362 QUIC_BUG(quic_bug_10586_14) << "Cannot set TTL of a static stream.";
1363 return false;
1364 }
1365 if (deadline_.IsInitialized()) {
1366 QUIC_DLOG(WARNING) << "Deadline has already been set.";
1367 return false;
1368 }
1369 QuicTime now = session()->connection()->clock()->ApproximateNow();
1370 deadline_ = now + ttl;
1371 return true;
1372 }
1373
HasDeadlinePassed() const1374 bool QuicStream::HasDeadlinePassed() const {
1375 if (!deadline_.IsInitialized()) {
1376 // No deadline has been set.
1377 return false;
1378 }
1379 QuicTime now = session()->connection()->clock()->ApproximateNow();
1380 if (now < deadline_) {
1381 return false;
1382 }
1383 // TTL expired.
1384 QUIC_DVLOG(1) << "stream " << id() << " deadline has passed";
1385 return true;
1386 }
1387
OnDeadlinePassed()1388 void QuicStream::OnDeadlinePassed() { Reset(QUIC_STREAM_TTL_EXPIRED); }
1389
IsFlowControlBlocked() const1390 bool QuicStream::IsFlowControlBlocked() const {
1391 if (!flow_controller_.has_value()) {
1392 QUIC_BUG(quic_bug_10586_15)
1393 << "Trying to access non-existent flow controller.";
1394 return false;
1395 }
1396 return flow_controller_->IsBlocked();
1397 }
1398
highest_received_byte_offset() const1399 QuicStreamOffset QuicStream::highest_received_byte_offset() const {
1400 if (!flow_controller_.has_value()) {
1401 QUIC_BUG(quic_bug_10586_16)
1402 << "Trying to access non-existent flow controller.";
1403 return 0;
1404 }
1405 return flow_controller_->highest_received_byte_offset();
1406 }
1407
UpdateReceiveWindowSize(QuicStreamOffset size)1408 void QuicStream::UpdateReceiveWindowSize(QuicStreamOffset size) {
1409 if (!flow_controller_.has_value()) {
1410 QUIC_BUG(quic_bug_10586_17)
1411 << "Trying to access non-existent flow controller.";
1412 return;
1413 }
1414 flow_controller_->UpdateReceiveWindowSize(size);
1415 }
1416
GetSendWindow() const1417 std::optional<QuicByteCount> QuicStream::GetSendWindow() const {
1418 return flow_controller_.has_value()
1419 ? std::optional<QuicByteCount>(flow_controller_->SendWindowSize())
1420 : std::nullopt;
1421 }
1422
GetReceiveWindow() const1423 std::optional<QuicByteCount> QuicStream::GetReceiveWindow() const {
1424 return flow_controller_.has_value()
1425 ? std::optional<QuicByteCount>(
1426 flow_controller_->receive_window_size())
1427 : std::nullopt;
1428 }
1429
OnStreamCreatedFromPendingStream()1430 void QuicStream::OnStreamCreatedFromPendingStream() {
1431 sequencer()->SetUnblocked();
1432 }
1433
CalculateSendWindowSize() const1434 QuicByteCount QuicStream::CalculateSendWindowSize() const {
1435 QuicByteCount send_window;
1436 if (flow_controller_.has_value()) {
1437 send_window = flow_controller_->SendWindowSize();
1438 } else {
1439 send_window = std::numeric_limits<QuicByteCount>::max();
1440 }
1441 if (stream_contributes_to_connection_flow_control_) {
1442 send_window =
1443 std::min(send_window, connection_flow_controller_->SendWindowSize());
1444 }
1445 return send_window;
1446 }
1447
1448 } // namespace quic
1449