xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/core/quic_stream.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/quic_stream.h"
6 
7 #include <limits>
8 #include <optional>
9 #include <string>
10 
11 #include "absl/strings/str_cat.h"
12 #include "absl/strings/string_view.h"
13 #include "quiche/quic/core/quic_error_codes.h"
14 #include "quiche/quic/core/quic_flow_controller.h"
15 #include "quiche/quic/core/quic_session.h"
16 #include "quiche/quic/core/quic_types.h"
17 #include "quiche/quic/core/quic_utils.h"
18 #include "quiche/quic/core/quic_versions.h"
19 #include "quiche/quic/platform/api/quic_bug_tracker.h"
20 #include "quiche/quic/platform/api/quic_flag_utils.h"
21 #include "quiche/quic/platform/api/quic_flags.h"
22 #include "quiche/quic/platform/api/quic_logging.h"
23 #include "quiche/common/platform/api/quiche_logging.h"
24 #include "quiche/common/platform/api/quiche_mem_slice.h"
25 
26 using spdy::SpdyPriority;
27 
28 namespace quic {
29 
30 #define ENDPOINT \
31   (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
32 
33 namespace {
34 
DefaultFlowControlWindow(ParsedQuicVersion version)35 QuicByteCount DefaultFlowControlWindow(ParsedQuicVersion version) {
36   if (!version.AllowsLowFlowControlLimits()) {
37     return kDefaultFlowControlSendWindow;
38   }
39   return 0;
40 }
41 
GetInitialStreamFlowControlWindowToSend(QuicSession * session,QuicStreamId stream_id)42 QuicByteCount GetInitialStreamFlowControlWindowToSend(QuicSession* session,
43                                                       QuicStreamId stream_id) {
44   ParsedQuicVersion version = session->connection()->version();
45   if (version.handshake_protocol != PROTOCOL_TLS1_3) {
46     return session->config()->GetInitialStreamFlowControlWindowToSend();
47   }
48 
49   // Unidirectional streams (v99 only).
50   if (VersionHasIetfQuicFrames(version.transport_version) &&
51       !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
52     return session->config()
53         ->GetInitialMaxStreamDataBytesUnidirectionalToSend();
54   }
55 
56   if (QuicUtils::IsOutgoingStreamId(version, stream_id,
57                                     session->perspective())) {
58     return session->config()
59         ->GetInitialMaxStreamDataBytesOutgoingBidirectionalToSend();
60   }
61 
62   return session->config()
63       ->GetInitialMaxStreamDataBytesIncomingBidirectionalToSend();
64 }
65 
GetReceivedFlowControlWindow(QuicSession * session,QuicStreamId stream_id)66 QuicByteCount GetReceivedFlowControlWindow(QuicSession* session,
67                                            QuicStreamId stream_id) {
68   ParsedQuicVersion version = session->connection()->version();
69   if (version.handshake_protocol != PROTOCOL_TLS1_3) {
70     if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
71       return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
72     }
73 
74     return DefaultFlowControlWindow(version);
75   }
76 
77   // Unidirectional streams (v99 only).
78   if (VersionHasIetfQuicFrames(version.transport_version) &&
79       !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
80     if (session->config()
81             ->HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
82       return session->config()
83           ->ReceivedInitialMaxStreamDataBytesUnidirectional();
84     }
85 
86     return DefaultFlowControlWindow(version);
87   }
88 
89   if (QuicUtils::IsOutgoingStreamId(version, stream_id,
90                                     session->perspective())) {
91     if (session->config()
92             ->HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
93       return session->config()
94           ->ReceivedInitialMaxStreamDataBytesOutgoingBidirectional();
95     }
96 
97     return DefaultFlowControlWindow(version);
98   }
99 
100   if (session->config()
101           ->HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
102     return session->config()
103         ->ReceivedInitialMaxStreamDataBytesIncomingBidirectional();
104   }
105 
106   return DefaultFlowControlWindow(version);
107 }
108 
109 }  // namespace
110 
PendingStream(QuicStreamId id,QuicSession * session)111 PendingStream::PendingStream(QuicStreamId id, QuicSession* session)
112     : id_(id),
113       version_(session->version()),
114       stream_delegate_(session),
115       stream_bytes_read_(0),
116       fin_received_(false),
117       is_bidirectional_(QuicUtils::GetStreamType(id, session->perspective(),
118                                                  /*peer_initiated = */ true,
119                                                  session->version()) ==
120                         BIDIRECTIONAL),
121       connection_flow_controller_(session->flow_controller()),
122       flow_controller_(session, id,
123                        /*is_connection_flow_controller*/ false,
124                        GetReceivedFlowControlWindow(session, id),
125                        GetInitialStreamFlowControlWindowToSend(session, id),
126                        kStreamReceiveWindowLimit,
127                        session->flow_controller()->auto_tune_receive_window(),
128                        session->flow_controller()),
129       sequencer_(this),
130       creation_time_(session->GetClock()->ApproximateNow()) {
131   if (is_bidirectional_) {
132     QUIC_CODE_COUNT_N(quic_pending_stream, 3, 3);
133   }
134 }
135 
OnDataAvailable()136 void PendingStream::OnDataAvailable() {
137   // Data should be kept in the sequencer so that
138   // QuicSession::ProcessPendingStream() can read it.
139 }
140 
OnFinRead()141 void PendingStream::OnFinRead() { QUICHE_DCHECK(sequencer_.IsClosed()); }
142 
AddBytesConsumed(QuicByteCount bytes)143 void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
144   // It will be called when the metadata of the stream is consumed.
145   flow_controller_.AddBytesConsumed(bytes);
146   connection_flow_controller_->AddBytesConsumed(bytes);
147 }
148 
ResetWithError(QuicResetStreamError)149 void PendingStream::ResetWithError(QuicResetStreamError /*error*/) {
150   // Currently PendingStream is only read-unidirectional. It shouldn't send
151   // Reset.
152   QUICHE_NOTREACHED();
153 }
154 
OnUnrecoverableError(QuicErrorCode error,const std::string & details)155 void PendingStream::OnUnrecoverableError(QuicErrorCode error,
156                                          const std::string& details) {
157   stream_delegate_->OnStreamError(error, details);
158 }
159 
OnUnrecoverableError(QuicErrorCode error,QuicIetfTransportErrorCodes ietf_error,const std::string & details)160 void PendingStream::OnUnrecoverableError(QuicErrorCode error,
161                                          QuicIetfTransportErrorCodes ietf_error,
162                                          const std::string& details) {
163   stream_delegate_->OnStreamError(error, ietf_error, details);
164 }
165 
id() const166 QuicStreamId PendingStream::id() const { return id_; }
167 
version() const168 ParsedQuicVersion PendingStream::version() const { return version_; }
169 
OnStreamFrame(const QuicStreamFrame & frame)170 void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) {
171   QUICHE_DCHECK_EQ(frame.stream_id, id_);
172 
173   bool is_stream_too_long =
174       (frame.offset > kMaxStreamLength) ||
175       (kMaxStreamLength - frame.offset < frame.data_length);
176   if (is_stream_too_long) {
177     // Close connection if stream becomes too long.
178     QUIC_PEER_BUG(quic_peer_bug_12570_1)
179         << "Receive stream frame reaches max stream length. frame offset "
180         << frame.offset << " length " << frame.data_length;
181     OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
182                          "Peer sends more data than allowed on this stream.");
183     return;
184   }
185 
186   if (frame.offset + frame.data_length > sequencer_.close_offset()) {
187     OnUnrecoverableError(
188         QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
189         absl::StrCat(
190             "Stream ", id_,
191             " received data with offset: ", frame.offset + frame.data_length,
192             ", which is beyond close offset: ", sequencer()->close_offset()));
193     return;
194   }
195 
196   if (frame.fin) {
197     fin_received_ = true;
198   }
199 
200   // This count includes duplicate data received.
201   QuicByteCount frame_payload_size = frame.data_length;
202   stream_bytes_read_ += frame_payload_size;
203 
204   // Flow control is interested in tracking highest received offset.
205   // Only interested in received frames that carry data.
206   if (frame_payload_size > 0 &&
207       MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
208     // As the highest received offset has changed, check to see if this is a
209     // violation of flow control.
210     if (flow_controller_.FlowControlViolation() ||
211         connection_flow_controller_->FlowControlViolation()) {
212       OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
213                            "Flow control violation after increasing offset");
214       return;
215     }
216   }
217 
218   sequencer_.OnStreamFrame(frame);
219 }
220 
OnRstStreamFrame(const QuicRstStreamFrame & frame)221 void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
222   QUICHE_DCHECK_EQ(frame.stream_id, id_);
223 
224   if (frame.byte_offset > kMaxStreamLength) {
225     // Peer are not suppose to write bytes more than maxium allowed.
226     OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
227                          "Reset frame stream offset overflow.");
228     return;
229   }
230 
231   const QuicStreamOffset kMaxOffset =
232       std::numeric_limits<QuicStreamOffset>::max();
233   if (sequencer()->close_offset() != kMaxOffset &&
234       frame.byte_offset != sequencer()->close_offset()) {
235     OnUnrecoverableError(
236         QUIC_STREAM_MULTIPLE_OFFSET,
237         absl::StrCat("Stream ", id_,
238                      " received new final offset: ", frame.byte_offset,
239                      ", which is different from close offset: ",
240                      sequencer()->close_offset()));
241     return;
242   }
243 
244   MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
245   if (flow_controller_.FlowControlViolation() ||
246       connection_flow_controller_->FlowControlViolation()) {
247     OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
248                          "Flow control violation after increasing offset");
249     return;
250   }
251 }
252 
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)253 void PendingStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
254   QUICHE_DCHECK(is_bidirectional_);
255   flow_controller_.UpdateSendWindowOffset(frame.max_data);
256 }
257 
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)258 bool PendingStream::MaybeIncreaseHighestReceivedOffset(
259     QuicStreamOffset new_offset) {
260   uint64_t increment =
261       new_offset - flow_controller_.highest_received_byte_offset();
262   if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
263     return false;
264   }
265 
266   // If |new_offset| increased the stream flow controller's highest received
267   // offset, increase the connection flow controller's value by the incremental
268   // difference.
269   connection_flow_controller_->UpdateHighestReceivedOffset(
270       connection_flow_controller_->highest_received_byte_offset() + increment);
271   return true;
272 }
273 
OnStopSending(QuicResetStreamError stop_sending_error_code)274 void PendingStream::OnStopSending(
275     QuicResetStreamError stop_sending_error_code) {
276   if (!stop_sending_error_code_) {
277     stop_sending_error_code_ = stop_sending_error_code;
278   }
279 }
280 
MarkConsumed(QuicByteCount num_bytes)281 void PendingStream::MarkConsumed(QuicByteCount num_bytes) {
282   sequencer_.MarkConsumed(num_bytes);
283 }
284 
StopReading()285 void PendingStream::StopReading() {
286   QUIC_DVLOG(1) << "Stop reading from pending stream " << id();
287   sequencer_.StopReading();
288 }
289 
QuicStream(PendingStream * pending,QuicSession * session,bool is_static)290 QuicStream::QuicStream(PendingStream* pending, QuicSession* session,
291                        bool is_static)
292     : QuicStream(
293           pending->id_, session, std::move(pending->sequencer_), is_static,
294           QuicUtils::GetStreamType(pending->id_, session->perspective(),
295                                    /*peer_initiated = */ true,
296                                    session->version()),
297           pending->stream_bytes_read_, pending->fin_received_,
298           std::move(pending->flow_controller_),
299           pending->connection_flow_controller_,
300           (session->GetClock()->ApproximateNow() - pending->creation_time())) {
301   QUICHE_DCHECK(session->version().HasIetfQuicFrames());
302   sequencer_.set_stream(this);
303 }
304 
305 namespace {
306 
FlowController(QuicStreamId id,QuicSession * session,StreamType type)307 std::optional<QuicFlowController> FlowController(QuicStreamId id,
308                                                  QuicSession* session,
309                                                  StreamType type) {
310   if (type == CRYPTO) {
311     // The only QuicStream with a StreamType of CRYPTO is QuicCryptoStream, when
312     // it is using crypto frames instead of stream frames. The QuicCryptoStream
313     // doesn't have any flow control in that case, so we don't create a
314     // QuicFlowController for it.
315     return std::nullopt;
316   }
317   return QuicFlowController(
318       session, id,
319       /*is_connection_flow_controller*/ false,
320       GetReceivedFlowControlWindow(session, id),
321       GetInitialStreamFlowControlWindowToSend(session, id),
322       kStreamReceiveWindowLimit,
323       session->flow_controller()->auto_tune_receive_window(),
324       session->flow_controller());
325 }
326 
327 }  // namespace
328 
QuicStream(QuicStreamId id,QuicSession * session,bool is_static,StreamType type)329 QuicStream::QuicStream(QuicStreamId id, QuicSession* session, bool is_static,
330                        StreamType type)
331     : QuicStream(id, session, QuicStreamSequencer(this), is_static, type, 0,
332                  false, FlowController(id, session, type),
333                  session->flow_controller(), QuicTime::Delta::Zero()) {}
334 
QuicStream(QuicStreamId id,QuicSession * session,QuicStreamSequencer sequencer,bool is_static,StreamType type,uint64_t stream_bytes_read,bool fin_received,std::optional<QuicFlowController> flow_controller,QuicFlowController * connection_flow_controller,QuicTime::Delta pending_duration)335 QuicStream::QuicStream(QuicStreamId id, QuicSession* session,
336                        QuicStreamSequencer sequencer, bool is_static,
337                        StreamType type, uint64_t stream_bytes_read,
338                        bool fin_received,
339                        std::optional<QuicFlowController> flow_controller,
340                        QuicFlowController* connection_flow_controller,
341                        QuicTime::Delta pending_duration)
342     : sequencer_(std::move(sequencer)),
343       id_(id),
344       session_(session),
345       stream_delegate_(session),
346       priority_(QuicStreamPriority::Default(session->priority_type())),
347       stream_bytes_read_(stream_bytes_read),
348       stream_error_(QuicResetStreamError::NoError()),
349       connection_error_(QUIC_NO_ERROR),
350       read_side_closed_(false),
351       write_side_closed_(false),
352       write_side_data_recvd_state_notified_(false),
353       fin_buffered_(false),
354       fin_sent_(false),
355       fin_outstanding_(false),
356       fin_lost_(false),
357       fin_received_(fin_received),
358       rst_sent_(false),
359       rst_received_(false),
360       stop_sending_sent_(false),
361       flow_controller_(std::move(flow_controller)),
362       connection_flow_controller_(connection_flow_controller),
363       stream_contributes_to_connection_flow_control_(true),
364       busy_counter_(0),
365       add_random_padding_after_fin_(false),
366       send_buffer_(
367           session->connection()->helper()->GetStreamSendBufferAllocator()),
368       buffered_data_threshold_(GetQuicFlag(quic_buffered_data_threshold)),
369       is_static_(is_static),
370       deadline_(QuicTime::Zero()),
371       was_draining_(false),
372       type_(VersionHasIetfQuicFrames(session->transport_version()) &&
373                     type != CRYPTO
374                 ? QuicUtils::GetStreamType(id_, session->perspective(),
375                                            session->IsIncomingStream(id_),
376                                            session->version())
377                 : type),
378       creation_time_(session->connection()->clock()->ApproximateNow()),
379       pending_duration_(pending_duration),
380       perspective_(session->perspective()) {
381   if (type_ == WRITE_UNIDIRECTIONAL) {
382     fin_received_ = true;
383     CloseReadSide();
384   } else if (type_ == READ_UNIDIRECTIONAL) {
385     fin_sent_ = true;
386     CloseWriteSide();
387   }
388   if (type_ != CRYPTO) {
389     stream_delegate_->RegisterStreamPriority(id, is_static_, priority_);
390   }
391 }
392 
~QuicStream()393 QuicStream::~QuicStream() {
394   if (session_ != nullptr && IsWaitingForAcks()) {
395     QUIC_DVLOG(1)
396         << ENDPOINT << "Stream " << id_
397         << " gets destroyed while waiting for acks. stream_bytes_outstanding = "
398         << send_buffer_.stream_bytes_outstanding()
399         << ", fin_outstanding: " << fin_outstanding_;
400   }
401   if (stream_delegate_ != nullptr && type_ != CRYPTO) {
402     stream_delegate_->UnregisterStreamPriority(id());
403   }
404 }
405 
OnStreamFrame(const QuicStreamFrame & frame)406 void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
407   QUICHE_DCHECK_EQ(frame.stream_id, id_);
408 
409   QUICHE_DCHECK(!(read_side_closed_ && write_side_closed_));
410 
411   if (frame.fin && is_static_) {
412     OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
413                          "Attempt to close a static stream");
414     return;
415   }
416 
417   if (type_ == WRITE_UNIDIRECTIONAL) {
418     OnUnrecoverableError(QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM,
419                          "Data received on write unidirectional stream");
420     return;
421   }
422 
423   bool is_stream_too_long =
424       (frame.offset > kMaxStreamLength) ||
425       (kMaxStreamLength - frame.offset < frame.data_length);
426   if (is_stream_too_long) {
427     // Close connection if stream becomes too long.
428     QUIC_PEER_BUG(quic_peer_bug_10586_1)
429         << "Receive stream frame on stream " << id_
430         << " reaches max stream length. frame offset " << frame.offset
431         << " length " << frame.data_length << ". " << sequencer_.DebugString();
432     OnUnrecoverableError(
433         QUIC_STREAM_LENGTH_OVERFLOW,
434         absl::StrCat("Peer sends more data than allowed on stream ", id_,
435                      ". frame: offset = ", frame.offset, ", length = ",
436                      frame.data_length, ". ", sequencer_.DebugString()));
437     return;
438   }
439 
440   if (frame.offset + frame.data_length > sequencer_.close_offset()) {
441     OnUnrecoverableError(
442         QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
443         absl::StrCat(
444             "Stream ", id_,
445             " received data with offset: ", frame.offset + frame.data_length,
446             ", which is beyond close offset: ", sequencer_.close_offset()));
447     return;
448   }
449 
450   if (frame.fin && !fin_received_) {
451     fin_received_ = true;
452     if (fin_sent_) {
453       QUICHE_DCHECK(!was_draining_);
454       session_->StreamDraining(id_,
455                                /*unidirectional=*/type_ != BIDIRECTIONAL);
456       was_draining_ = true;
457     }
458   }
459 
460   if (read_side_closed_) {
461     QUIC_DLOG(INFO)
462         << ENDPOINT << "Stream " << frame.stream_id
463         << " is closed for reading. Ignoring newly received stream data.";
464     // The subclass does not want to read data:  blackhole the data.
465     return;
466   }
467 
468   // This count includes duplicate data received.
469   QuicByteCount frame_payload_size = frame.data_length;
470   stream_bytes_read_ += frame_payload_size;
471 
472   // Flow control is interested in tracking highest received offset.
473   // Only interested in received frames that carry data.
474   if (frame_payload_size > 0 &&
475       MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
476     // As the highest received offset has changed, check to see if this is a
477     // violation of flow control.
478     QUIC_BUG_IF(quic_bug_12570_2, !flow_controller_.has_value())
479         << ENDPOINT << "OnStreamFrame called on stream without flow control";
480     if ((flow_controller_.has_value() &&
481          flow_controller_->FlowControlViolation()) ||
482         connection_flow_controller_->FlowControlViolation()) {
483       OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
484                            "Flow control violation after increasing offset");
485       return;
486     }
487   }
488 
489   sequencer_.OnStreamFrame(frame);
490 }
491 
OnStopSending(QuicResetStreamError error)492 bool QuicStream::OnStopSending(QuicResetStreamError error) {
493   // Do not reset the stream if all data has been sent and acknowledged.
494   if (write_side_closed() && !IsWaitingForAcks()) {
495     QUIC_DVLOG(1) << ENDPOINT
496                   << "Ignoring STOP_SENDING for a write closed stream, id: "
497                   << id_;
498     return false;
499   }
500 
501   if (is_static_) {
502     QUIC_DVLOG(1) << ENDPOINT
503                   << "Received STOP_SENDING for a static stream, id: " << id_
504                   << " Closing connection";
505     OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
506                          "Received STOP_SENDING for a static stream");
507     return false;
508   }
509 
510   stream_error_ = error;
511   MaybeSendRstStream(error);
512   return true;
513 }
514 
num_frames_received() const515 int QuicStream::num_frames_received() const {
516   return sequencer_.num_frames_received();
517 }
518 
num_duplicate_frames_received() const519 int QuicStream::num_duplicate_frames_received() const {
520   return sequencer_.num_duplicate_frames_received();
521 }
522 
OnStreamReset(const QuicRstStreamFrame & frame)523 void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
524   rst_received_ = true;
525   if (frame.byte_offset > kMaxStreamLength) {
526     // Peer are not suppose to write bytes more than maxium allowed.
527     OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
528                          "Reset frame stream offset overflow.");
529     return;
530   }
531 
532   const QuicStreamOffset kMaxOffset =
533       std::numeric_limits<QuicStreamOffset>::max();
534   if (sequencer()->close_offset() != kMaxOffset &&
535       frame.byte_offset != sequencer()->close_offset()) {
536     OnUnrecoverableError(
537         QUIC_STREAM_MULTIPLE_OFFSET,
538         absl::StrCat("Stream ", id_,
539                      " received new final offset: ", frame.byte_offset,
540                      ", which is different from close offset: ",
541                      sequencer_.close_offset()));
542     return;
543   }
544 
545   MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
546   QUIC_BUG_IF(quic_bug_12570_3, !flow_controller_.has_value())
547       << ENDPOINT << "OnStreamReset called on stream without flow control";
548   if ((flow_controller_.has_value() &&
549        flow_controller_->FlowControlViolation()) ||
550       connection_flow_controller_->FlowControlViolation()) {
551     OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
552                          "Flow control violation after increasing offset");
553     return;
554   }
555 
556   stream_error_ = frame.error();
557   // Google QUIC closes both sides of the stream in response to a
558   // RESET_STREAM, IETF QUIC closes only the read side.
559   if (!VersionHasIetfQuicFrames(transport_version())) {
560     CloseWriteSide();
561   }
562   CloseReadSide();
563 }
564 
OnConnectionClosed(QuicErrorCode error,ConnectionCloseSource)565 void QuicStream::OnConnectionClosed(QuicErrorCode error,
566                                     ConnectionCloseSource /*source*/) {
567   if (read_side_closed_ && write_side_closed_) {
568     return;
569   }
570   if (error != QUIC_NO_ERROR) {
571     stream_error_ =
572         QuicResetStreamError::FromInternal(QUIC_STREAM_CONNECTION_ERROR);
573     connection_error_ = error;
574   }
575 
576   CloseWriteSide();
577   CloseReadSide();
578 }
579 
OnFinRead()580 void QuicStream::OnFinRead() {
581   QUICHE_DCHECK(sequencer_.IsClosed());
582   // OnFinRead can be called due to a FIN flag in a headers block, so there may
583   // have been no OnStreamFrame call with a FIN in the frame.
584   fin_received_ = true;
585   // If fin_sent_ is true, then CloseWriteSide has already been called, and the
586   // stream will be destroyed by CloseReadSide, so don't need to call
587   // StreamDraining.
588   CloseReadSide();
589 }
590 
SetFinSent()591 void QuicStream::SetFinSent() {
592   QUICHE_DCHECK(!VersionUsesHttp3(transport_version()));
593   fin_sent_ = true;
594 }
595 
Reset(QuicRstStreamErrorCode error)596 void QuicStream::Reset(QuicRstStreamErrorCode error) {
597   ResetWithError(QuicResetStreamError::FromInternal(error));
598 }
599 
ResetWithError(QuicResetStreamError error)600 void QuicStream::ResetWithError(QuicResetStreamError error) {
601   stream_error_ = error;
602   QuicConnection::ScopedPacketFlusher flusher(session()->connection());
603   MaybeSendStopSending(error);
604   MaybeSendRstStream(error);
605 
606   if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
607     session()->MaybeCloseZombieStream(id_);
608   }
609 }
610 
ResetWriteSide(QuicResetStreamError error)611 void QuicStream::ResetWriteSide(QuicResetStreamError error) {
612   stream_error_ = error;
613   MaybeSendRstStream(error);
614 
615   if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
616     session()->MaybeCloseZombieStream(id_);
617   }
618 }
619 
SendStopSending(QuicResetStreamError error)620 void QuicStream::SendStopSending(QuicResetStreamError error) {
621   stream_error_ = error;
622   MaybeSendStopSending(error);
623 
624   if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
625     session()->MaybeCloseZombieStream(id_);
626   }
627 }
628 
OnUnrecoverableError(QuicErrorCode error,const std::string & details)629 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
630                                       const std::string& details) {
631   stream_delegate_->OnStreamError(error, details);
632 }
633 
OnUnrecoverableError(QuicErrorCode error,QuicIetfTransportErrorCodes ietf_error,const std::string & details)634 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
635                                       QuicIetfTransportErrorCodes ietf_error,
636                                       const std::string& details) {
637   stream_delegate_->OnStreamError(error, ietf_error, details);
638 }
639 
priority() const640 const QuicStreamPriority& QuicStream::priority() const { return priority_; }
641 
SetPriority(const QuicStreamPriority & priority)642 void QuicStream::SetPriority(const QuicStreamPriority& priority) {
643   priority_ = priority;
644 
645   MaybeSendPriorityUpdateFrame();
646 
647   stream_delegate_->UpdateStreamPriority(id(), priority);
648 }
649 
WriteOrBufferData(absl::string_view data,bool fin,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)650 void QuicStream::WriteOrBufferData(
651     absl::string_view data, bool fin,
652     quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
653         ack_listener) {
654   QUIC_BUG_IF(quic_bug_12570_4,
655               QuicUtils::IsCryptoStreamId(transport_version(), id_))
656       << ENDPOINT
657       << "WriteOrBufferData is used to send application data, use "
658          "WriteOrBufferDataAtLevel to send crypto data.";
659   return WriteOrBufferDataAtLevel(
660       data, fin, session()->GetEncryptionLevelToSendApplicationData(),
661       ack_listener);
662 }
663 
WriteOrBufferDataAtLevel(absl::string_view data,bool fin,EncryptionLevel level,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)664 void QuicStream::WriteOrBufferDataAtLevel(
665     absl::string_view data, bool fin, EncryptionLevel level,
666     quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
667         ack_listener) {
668   if (data.empty() && !fin) {
669     QUIC_BUG(quic_bug_10586_2) << "data.empty() && !fin";
670     return;
671   }
672 
673   if (fin_buffered_) {
674     QUIC_BUG(quic_bug_10586_3) << "Fin already buffered";
675     return;
676   }
677   if (write_side_closed_) {
678     QUIC_DLOG(ERROR) << ENDPOINT
679                      << "Attempt to write when the write side is closed";
680     if (type_ == READ_UNIDIRECTIONAL) {
681       OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
682                            "Try to send data on read unidirectional stream");
683     }
684     return;
685   }
686 
687   fin_buffered_ = fin;
688 
689   bool had_buffered_data = HasBufferedData();
690   // Do not respect buffered data upper limit as WriteOrBufferData guarantees
691   // all data to be consumed.
692   if (data.length() > 0) {
693     QuicStreamOffset offset = send_buffer_.stream_offset();
694     if (kMaxStreamLength - offset < data.length()) {
695       QUIC_BUG(quic_bug_10586_4) << "Write too many data via stream " << id_;
696       OnUnrecoverableError(
697           QUIC_STREAM_LENGTH_OVERFLOW,
698           absl::StrCat("Write too many data via stream ", id_));
699       return;
700     }
701     send_buffer_.SaveStreamData(data);
702     OnDataBuffered(offset, data.length(), ack_listener);
703   }
704   if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
705     // Write data if there is no buffered data before.
706     WriteBufferedData(level);
707   }
708 }
709 
OnCanWrite()710 void QuicStream::OnCanWrite() {
711   if (HasDeadlinePassed()) {
712     OnDeadlinePassed();
713     return;
714   }
715   if (HasPendingRetransmission()) {
716     WritePendingRetransmission();
717     // Exit early to allow other streams to write pending retransmissions if
718     // any.
719     return;
720   }
721 
722   if (write_side_closed_) {
723     QUIC_DLOG(ERROR)
724         << ENDPOINT << "Stream " << id()
725         << " attempting to write new data when the write side is closed";
726     return;
727   }
728   if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
729     WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData());
730   }
731   if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
732     // Notify upper layer to write new data when buffered data size is below
733     // low water mark.
734     OnCanWriteNewData();
735   }
736 }
737 
MaybeSendBlocked()738 void QuicStream::MaybeSendBlocked() {
739   if (!flow_controller_.has_value()) {
740     QUIC_BUG(quic_bug_10586_5)
741         << ENDPOINT << "MaybeSendBlocked called on stream without flow control";
742     return;
743   }
744   flow_controller_->MaybeSendBlocked();
745   if (!stream_contributes_to_connection_flow_control_) {
746     return;
747   }
748   connection_flow_controller_->MaybeSendBlocked();
749 
750   // If the stream is blocked by connection-level flow control but not by
751   // stream-level flow control, add the stream to the write blocked list so that
752   // the stream will be given a chance to write when a connection-level
753   // WINDOW_UPDATE arrives.
754   if (!write_side_closed_ && connection_flow_controller_->IsBlocked() &&
755       !flow_controller_->IsBlocked()) {
756     session_->MarkConnectionLevelWriteBlocked(id());
757   }
758 }
759 
WriteMemSlice(quiche::QuicheMemSlice span,bool fin)760 QuicConsumedData QuicStream::WriteMemSlice(quiche::QuicheMemSlice span,
761                                            bool fin) {
762   return WriteMemSlices(absl::MakeSpan(&span, 1), fin);
763 }
764 
WriteMemSlices(absl::Span<quiche::QuicheMemSlice> span,bool fin,bool buffer_unconditionally)765 QuicConsumedData QuicStream::WriteMemSlices(
766     absl::Span<quiche::QuicheMemSlice> span, bool fin,
767     bool buffer_unconditionally) {
768   QuicConsumedData consumed_data(0, false);
769   if (span.empty() && !fin) {
770     QUIC_BUG(quic_bug_10586_6) << "span.empty() && !fin";
771     return consumed_data;
772   }
773 
774   if (fin_buffered_) {
775     QUIC_BUG(quic_bug_10586_7) << "Fin already buffered";
776     return consumed_data;
777   }
778 
779   if (write_side_closed_) {
780     QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
781                      << " attempting to write when the write side is closed";
782     if (type_ == READ_UNIDIRECTIONAL) {
783       OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
784                            "Try to send data on read unidirectional stream");
785     }
786     return consumed_data;
787   }
788 
789   bool had_buffered_data = HasBufferedData();
790   if (CanWriteNewData() || span.empty() || buffer_unconditionally) {
791     consumed_data.fin_consumed = fin;
792     if (!span.empty()) {
793       // Buffer all data if buffered data size is below limit.
794       QuicStreamOffset offset = send_buffer_.stream_offset();
795       consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span);
796       if (offset > send_buffer_.stream_offset() ||
797           kMaxStreamLength < send_buffer_.stream_offset()) {
798         QUIC_BUG(quic_bug_10586_8) << "Write too many data via stream " << id_;
799         OnUnrecoverableError(
800             QUIC_STREAM_LENGTH_OVERFLOW,
801             absl::StrCat("Write too many data via stream ", id_));
802         return consumed_data;
803       }
804       OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr);
805     }
806   }
807   fin_buffered_ = consumed_data.fin_consumed;
808 
809   if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
810     // Write data if there is no buffered data before.
811     WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData());
812   }
813 
814   return consumed_data;
815 }
816 
HasPendingRetransmission() const817 bool QuicStream::HasPendingRetransmission() const {
818   return send_buffer_.HasPendingRetransmission() || fin_lost_;
819 }
820 
IsStreamFrameOutstanding(QuicStreamOffset offset,QuicByteCount data_length,bool fin) const821 bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
822                                           QuicByteCount data_length,
823                                           bool fin) const {
824   return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
825          (fin && fin_outstanding_);
826 }
827 
CloseReadSide()828 void QuicStream::CloseReadSide() {
829   if (read_side_closed_) {
830     return;
831   }
832   QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
833 
834   read_side_closed_ = true;
835   sequencer_.ReleaseBuffer();
836 
837   if (write_side_closed_) {
838     QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
839     session_->OnStreamClosed(id());
840     OnClose();
841   }
842 }
843 
CloseWriteSide()844 void QuicStream::CloseWriteSide() {
845   if (write_side_closed_) {
846     return;
847   }
848   QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
849 
850   write_side_closed_ = true;
851   if (read_side_closed_) {
852     QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
853     session_->OnStreamClosed(id());
854     OnClose();
855   }
856 }
857 
MaybeSendStopSending(QuicResetStreamError error)858 void QuicStream::MaybeSendStopSending(QuicResetStreamError error) {
859   if (stop_sending_sent_) {
860     return;
861   }
862 
863   if (!session()->version().UsesHttp3() && !error.ok()) {
864     // In gQUIC, RST with error closes both read and write side.
865     return;
866   }
867 
868   if (session()->version().UsesHttp3()) {
869     session()->MaybeSendStopSendingFrame(id(), error);
870   } else {
871     QUICHE_DCHECK_EQ(QUIC_STREAM_NO_ERROR, error.internal_code());
872     session()->MaybeSendRstStreamFrame(id(), QuicResetStreamError::NoError(),
873                                        stream_bytes_written());
874   }
875   stop_sending_sent_ = true;
876   CloseReadSide();
877 }
878 
MaybeSendRstStream(QuicResetStreamError error)879 void QuicStream::MaybeSendRstStream(QuicResetStreamError error) {
880   if (rst_sent_) {
881     return;
882   }
883 
884   if (!session()->version().UsesHttp3()) {
885     QUIC_BUG_IF(quic_bug_12570_5, error.ok());
886     stop_sending_sent_ = true;
887     CloseReadSide();
888   }
889   session()->MaybeSendRstStreamFrame(id(), error, stream_bytes_written());
890   rst_sent_ = true;
891   CloseWriteSide();
892 }
893 
HasBufferedData() const894 bool QuicStream::HasBufferedData() const {
895   QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
896   return send_buffer_.stream_offset() > stream_bytes_written();
897 }
898 
version() const899 ParsedQuicVersion QuicStream::version() const { return session_->version(); }
900 
transport_version() const901 QuicTransportVersion QuicStream::transport_version() const {
902   return session_->transport_version();
903 }
904 
handshake_protocol() const905 HandshakeProtocol QuicStream::handshake_protocol() const {
906   return session_->connection()->version().handshake_protocol;
907 }
908 
StopReading()909 void QuicStream::StopReading() {
910   QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
911   sequencer_.StopReading();
912 }
913 
OnClose()914 void QuicStream::OnClose() {
915   QUICHE_DCHECK(read_side_closed_ && write_side_closed_);
916 
917   if (!fin_sent_ && !rst_sent_) {
918     QUIC_BUG_IF(quic_bug_12570_6, session()->connection()->connected() &&
919                                       session()->version().UsesHttp3())
920         << "The stream should've already sent RST in response to "
921            "STOP_SENDING";
922     // For flow control accounting, tell the peer how many bytes have been
923     // written on this stream before termination. Done here if needed, using a
924     // RST_STREAM frame.
925     MaybeSendRstStream(QUIC_RST_ACKNOWLEDGEMENT);
926     session_->MaybeCloseZombieStream(id_);
927   }
928 
929   if (!flow_controller_.has_value() ||
930       flow_controller_->FlowControlViolation() ||
931       connection_flow_controller_->FlowControlViolation()) {
932     return;
933   }
934   // The stream is being closed and will not process any further incoming bytes.
935   // As there may be more bytes in flight, to ensure that both endpoints have
936   // the same connection level flow control state, mark all unreceived or
937   // buffered bytes as consumed.
938   QuicByteCount bytes_to_consume =
939       flow_controller_->highest_received_byte_offset() -
940       flow_controller_->bytes_consumed();
941   AddBytesConsumed(bytes_to_consume);
942 }
943 
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)944 void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
945   if (type_ == READ_UNIDIRECTIONAL) {
946     OnUnrecoverableError(
947         QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
948         "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.");
949     return;
950   }
951 
952   if (!flow_controller_.has_value()) {
953     QUIC_BUG(quic_bug_10586_9)
954         << ENDPOINT
955         << "OnWindowUpdateFrame called on stream without flow control";
956     return;
957   }
958 
959   if (flow_controller_->UpdateSendWindowOffset(frame.max_data)) {
960     // Let session unblock this stream.
961     session_->MarkConnectionLevelWriteBlocked(id_);
962   }
963 }
964 
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)965 bool QuicStream::MaybeIncreaseHighestReceivedOffset(
966     QuicStreamOffset new_offset) {
967   if (!flow_controller_.has_value()) {
968     QUIC_BUG(quic_bug_10586_10)
969         << ENDPOINT
970         << "MaybeIncreaseHighestReceivedOffset called on stream without "
971            "flow control";
972     return false;
973   }
974   uint64_t increment =
975       new_offset - flow_controller_->highest_received_byte_offset();
976   if (!flow_controller_->UpdateHighestReceivedOffset(new_offset)) {
977     return false;
978   }
979 
980   // If |new_offset| increased the stream flow controller's highest received
981   // offset, increase the connection flow controller's value by the incremental
982   // difference.
983   if (stream_contributes_to_connection_flow_control_) {
984     connection_flow_controller_->UpdateHighestReceivedOffset(
985         connection_flow_controller_->highest_received_byte_offset() +
986         increment);
987   }
988   return true;
989 }
990 
AddBytesSent(QuicByteCount bytes)991 void QuicStream::AddBytesSent(QuicByteCount bytes) {
992   if (!flow_controller_.has_value()) {
993     QUIC_BUG(quic_bug_10586_11)
994         << ENDPOINT << "AddBytesSent called on stream without flow control";
995     return;
996   }
997   flow_controller_->AddBytesSent(bytes);
998   if (stream_contributes_to_connection_flow_control_) {
999     connection_flow_controller_->AddBytesSent(bytes);
1000   }
1001 }
1002 
AddBytesConsumed(QuicByteCount bytes)1003 void QuicStream::AddBytesConsumed(QuicByteCount bytes) {
1004   if (type_ == CRYPTO) {
1005     // A stream with type CRYPTO has no flow control, so there's nothing this
1006     // function needs to do. This function still gets called by the
1007     // QuicStreamSequencers used by QuicCryptoStream.
1008     return;
1009   }
1010   if (!flow_controller_.has_value()) {
1011     QUIC_BUG(quic_bug_12570_7)
1012         << ENDPOINT
1013         << "AddBytesConsumed called on non-crypto stream without flow control";
1014     return;
1015   }
1016   // Only adjust stream level flow controller if still reading.
1017   if (!read_side_closed_) {
1018     flow_controller_->AddBytesConsumed(bytes);
1019   }
1020 
1021   if (stream_contributes_to_connection_flow_control_) {
1022     connection_flow_controller_->AddBytesConsumed(bytes);
1023   }
1024 }
1025 
MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,bool was_zero_rtt_rejected)1026 bool QuicStream::MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,
1027                                              bool was_zero_rtt_rejected) {
1028   if (!flow_controller_.has_value()) {
1029     QUIC_BUG(quic_bug_10586_12)
1030         << ENDPOINT
1031         << "ConfigSendWindowOffset called on stream without flow control";
1032     return false;
1033   }
1034 
1035   // The validation code below is for QUIC with TLS only.
1036   if (new_offset < flow_controller_->send_window_offset()) {
1037     QUICHE_DCHECK(session()->version().UsesTls());
1038     if (was_zero_rtt_rejected && new_offset < flow_controller_->bytes_sent()) {
1039       // The client is given flow control window lower than what's written in
1040       // 0-RTT. This QUIC implementation is unable to retransmit them.
1041       QUIC_BUG_IF(quic_bug_12570_8, perspective_ == Perspective::IS_SERVER)
1042           << "Server streams' flow control should never be configured twice.";
1043       OnUnrecoverableError(
1044           QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1045           absl::StrCat(
1046               "Server rejected 0-RTT, aborting because new stream max data ",
1047               new_offset, " for stream ", id_, " is less than currently used: ",
1048               flow_controller_->bytes_sent()));
1049       return false;
1050     } else if (session()->version().AllowsLowFlowControlLimits()) {
1051       // In IETF QUIC, if the client receives flow control limit lower than what
1052       // was resumed from 0-RTT, depending on 0-RTT status, it's either the
1053       // peer's fault or our implementation's fault.
1054       QUIC_BUG_IF(quic_bug_12570_9, perspective_ == Perspective::IS_SERVER)
1055           << "Server streams' flow control should never be configured twice.";
1056       OnUnrecoverableError(
1057           was_zero_rtt_rejected ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1058                                 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1059           absl::StrCat(
1060               was_zero_rtt_rejected ? "Server rejected 0-RTT, aborting because "
1061                                     : "",
1062               "new stream max data ", new_offset, " decreases current limit: ",
1063               flow_controller_->send_window_offset()));
1064       return false;
1065     }
1066   }
1067 
1068   if (flow_controller_->UpdateSendWindowOffset(new_offset)) {
1069     // Let session unblock this stream.
1070     session_->MarkConnectionLevelWriteBlocked(id_);
1071   }
1072   return true;
1073 }
1074 
AddRandomPaddingAfterFin()1075 void QuicStream::AddRandomPaddingAfterFin() {
1076   add_random_padding_after_fin_ = true;
1077 }
1078 
OnStreamFrameAcked(QuicStreamOffset offset,QuicByteCount data_length,bool fin_acked,QuicTime::Delta,QuicTime,QuicByteCount * newly_acked_length)1079 bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
1080                                     QuicByteCount data_length, bool fin_acked,
1081                                     QuicTime::Delta /*ack_delay_time*/,
1082                                     QuicTime /*receive_timestamp*/,
1083                                     QuicByteCount* newly_acked_length) {
1084   QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
1085                 << "[" << offset << ", " << offset + data_length << "]"
1086                 << " fin = " << fin_acked;
1087   *newly_acked_length = 0;
1088   if (!send_buffer_.OnStreamDataAcked(offset, data_length,
1089                                       newly_acked_length)) {
1090     OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent data.");
1091     return false;
1092   }
1093   if (!fin_sent_ && fin_acked) {
1094     OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent fin.");
1095     return false;
1096   }
1097   // Indicates whether ack listener's OnPacketAcked should be called.
1098   const bool new_data_acked =
1099       *newly_acked_length > 0 || (fin_acked && fin_outstanding_);
1100   if (fin_acked) {
1101     fin_outstanding_ = false;
1102     fin_lost_ = false;
1103   }
1104   if (!IsWaitingForAcks() && write_side_closed_ &&
1105       !write_side_data_recvd_state_notified_) {
1106     OnWriteSideInDataRecvdState();
1107     write_side_data_recvd_state_notified_ = true;
1108   }
1109   if (!IsWaitingForAcks() && read_side_closed_ && write_side_closed_) {
1110     session_->MaybeCloseZombieStream(id_);
1111   }
1112   return new_data_acked;
1113 }
1114 
OnStreamFrameRetransmitted(QuicStreamOffset offset,QuicByteCount data_length,bool fin_retransmitted)1115 void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
1116                                             QuicByteCount data_length,
1117                                             bool fin_retransmitted) {
1118   send_buffer_.OnStreamDataRetransmitted(offset, data_length);
1119   if (fin_retransmitted) {
1120     fin_lost_ = false;
1121   }
1122 }
1123 
OnStreamFrameLost(QuicStreamOffset offset,QuicByteCount data_length,bool fin_lost)1124 void QuicStream::OnStreamFrameLost(QuicStreamOffset offset,
1125                                    QuicByteCount data_length, bool fin_lost) {
1126   QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting "
1127                 << "[" << offset << ", " << offset + data_length << "]"
1128                 << " fin = " << fin_lost;
1129   if (data_length > 0) {
1130     send_buffer_.OnStreamDataLost(offset, data_length);
1131   }
1132   if (fin_lost && fin_outstanding_) {
1133     fin_lost_ = true;
1134   }
1135 }
1136 
RetransmitStreamData(QuicStreamOffset offset,QuicByteCount data_length,bool fin,TransmissionType type)1137 bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
1138                                       QuicByteCount data_length, bool fin,
1139                                       TransmissionType type) {
1140   QUICHE_DCHECK(type == PTO_RETRANSMISSION);
1141   if (HasDeadlinePassed()) {
1142     OnDeadlinePassed();
1143     return true;
1144   }
1145   QuicIntervalSet<QuicStreamOffset> retransmission(offset,
1146                                                    offset + data_length);
1147   retransmission.Difference(bytes_acked());
1148   bool retransmit_fin = fin && fin_outstanding_;
1149   if (retransmission.Empty() && !retransmit_fin) {
1150     return true;
1151   }
1152   QuicConsumedData consumed(0, false);
1153   for (const auto& interval : retransmission) {
1154     QuicStreamOffset retransmission_offset = interval.min();
1155     QuicByteCount retransmission_length = interval.max() - interval.min();
1156     const bool can_bundle_fin =
1157         retransmit_fin && (retransmission_offset + retransmission_length ==
1158                            stream_bytes_written());
1159     consumed = stream_delegate_->WritevData(
1160         id_, retransmission_length, retransmission_offset,
1161         can_bundle_fin ? FIN : NO_FIN, type,
1162         session()->GetEncryptionLevelToSendApplicationData());
1163     QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1164                   << " is forced to retransmit stream data ["
1165                   << retransmission_offset << ", "
1166                   << retransmission_offset + retransmission_length
1167                   << ") and fin: " << can_bundle_fin
1168                   << ", consumed: " << consumed;
1169     OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed,
1170                                consumed.fin_consumed);
1171     if (can_bundle_fin) {
1172       retransmit_fin = !consumed.fin_consumed;
1173     }
1174     if (consumed.bytes_consumed < retransmission_length ||
1175         (can_bundle_fin && !consumed.fin_consumed)) {
1176       // Connection is write blocked.
1177       return false;
1178     }
1179   }
1180   if (retransmit_fin) {
1181     QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1182                   << " retransmits fin only frame.";
1183     consumed = stream_delegate_->WritevData(
1184         id_, 0, stream_bytes_written(), FIN, type,
1185         session()->GetEncryptionLevelToSendApplicationData());
1186     if (!consumed.fin_consumed) {
1187       return false;
1188     }
1189   }
1190   return true;
1191 }
1192 
IsWaitingForAcks() const1193 bool QuicStream::IsWaitingForAcks() const {
1194   return (!rst_sent_ || stream_error_.ok()) &&
1195          (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
1196 }
1197 
WriteStreamData(QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)1198 bool QuicStream::WriteStreamData(QuicStreamOffset offset,
1199                                  QuicByteCount data_length,
1200                                  QuicDataWriter* writer) {
1201   QUICHE_DCHECK_LT(0u, data_length);
1202   QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
1203                 << offset << " length " << data_length;
1204   return send_buffer_.WriteStreamData(offset, data_length, writer);
1205 }
1206 
WriteBufferedData(EncryptionLevel level)1207 void QuicStream::WriteBufferedData(EncryptionLevel level) {
1208   QUICHE_DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_));
1209 
1210   if (session_->ShouldYield(id())) {
1211     session_->MarkConnectionLevelWriteBlocked(id());
1212     return;
1213   }
1214 
1215   // Size of buffered data.
1216   QuicByteCount write_length = BufferedDataBytes();
1217 
1218   // A FIN with zero data payload should not be flow control blocked.
1219   bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
1220 
1221   bool fin = fin_buffered_;
1222 
1223   QUIC_BUG_IF(quic_bug_10586_13, !flow_controller_.has_value())
1224       << ENDPOINT << "WriteBufferedData called on stream without flow control";
1225 
1226   // How much data flow control permits to be written.
1227   QuicByteCount send_window = CalculateSendWindowSize();
1228 
1229   if (send_window == 0 && !fin_with_zero_data) {
1230     // Quick return if nothing can be sent.
1231     MaybeSendBlocked();
1232     return;
1233   }
1234 
1235   if (write_length > send_window) {
1236     // Don't send the FIN unless all the data will be sent.
1237     fin = false;
1238 
1239     // Writing more data would be a violation of flow control.
1240     write_length = send_window;
1241     QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
1242                   << write_length << " due to flow control";
1243   }
1244 
1245   StreamSendingState state = fin ? FIN : NO_FIN;
1246   if (fin && add_random_padding_after_fin_) {
1247     state = FIN_AND_PADDING;
1248   }
1249   QuicConsumedData consumed_data =
1250       stream_delegate_->WritevData(id(), write_length, stream_bytes_written(),
1251                                    state, NOT_RETRANSMISSION, level);
1252 
1253   OnStreamDataConsumed(consumed_data.bytes_consumed);
1254 
1255   AddBytesSent(consumed_data.bytes_consumed);
1256   QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
1257                 << stream_bytes_written() << " bytes "
1258                 << " and has buffered data " << BufferedDataBytes() << " bytes."
1259                 << " fin is sent: " << consumed_data.fin_consumed
1260                 << " fin is buffered: " << fin_buffered_;
1261 
1262   // The write may have generated a write error causing this stream to be
1263   // closed. If so, simply return without marking the stream write blocked.
1264   if (write_side_closed_) {
1265     return;
1266   }
1267 
1268   if (consumed_data.bytes_consumed == write_length) {
1269     if (!fin_with_zero_data) {
1270       MaybeSendBlocked();
1271     }
1272     if (fin && consumed_data.fin_consumed) {
1273       QUICHE_DCHECK(!fin_sent_);
1274       fin_sent_ = true;
1275       fin_outstanding_ = true;
1276       if (fin_received_) {
1277         QUICHE_DCHECK(!was_draining_);
1278         session_->StreamDraining(id_,
1279                                  /*unidirectional=*/type_ != BIDIRECTIONAL);
1280         was_draining_ = true;
1281       }
1282       CloseWriteSide();
1283     } else if (fin && !consumed_data.fin_consumed && !write_side_closed_) {
1284       session_->MarkConnectionLevelWriteBlocked(id());
1285     }
1286   } else {
1287     session_->MarkConnectionLevelWriteBlocked(id());
1288   }
1289   if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
1290     busy_counter_ = 0;
1291   }
1292 }
1293 
BufferedDataBytes() const1294 uint64_t QuicStream::BufferedDataBytes() const {
1295   QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
1296   return send_buffer_.stream_offset() - stream_bytes_written();
1297 }
1298 
CanWriteNewData() const1299 bool QuicStream::CanWriteNewData() const {
1300   return BufferedDataBytes() < buffered_data_threshold_;
1301 }
1302 
CanWriteNewDataAfterData(QuicByteCount length) const1303 bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const {
1304   return (BufferedDataBytes() + length) < buffered_data_threshold_;
1305 }
1306 
stream_bytes_written() const1307 uint64_t QuicStream::stream_bytes_written() const {
1308   return send_buffer_.stream_bytes_written();
1309 }
1310 
bytes_acked() const1311 const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
1312   return send_buffer_.bytes_acked();
1313 }
1314 
OnStreamDataConsumed(QuicByteCount bytes_consumed)1315 void QuicStream::OnStreamDataConsumed(QuicByteCount bytes_consumed) {
1316   send_buffer_.OnStreamDataConsumed(bytes_consumed);
1317 }
1318 
WritePendingRetransmission()1319 void QuicStream::WritePendingRetransmission() {
1320   while (HasPendingRetransmission()) {
1321     QuicConsumedData consumed(0, false);
1322     if (!send_buffer_.HasPendingRetransmission()) {
1323       QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1324                     << " retransmits fin only frame.";
1325       consumed = stream_delegate_->WritevData(
1326           id_, 0, stream_bytes_written(), FIN, LOSS_RETRANSMISSION,
1327           session()->GetEncryptionLevelToSendApplicationData());
1328       fin_lost_ = !consumed.fin_consumed;
1329       if (fin_lost_) {
1330         // Connection is write blocked.
1331         return;
1332       }
1333     } else {
1334       StreamPendingRetransmission pending =
1335           send_buffer_.NextPendingRetransmission();
1336       // Determine whether the lost fin can be bundled with the data.
1337       const bool can_bundle_fin =
1338           fin_lost_ &&
1339           (pending.offset + pending.length == stream_bytes_written());
1340       consumed = stream_delegate_->WritevData(
1341           id_, pending.length, pending.offset, can_bundle_fin ? FIN : NO_FIN,
1342           LOSS_RETRANSMISSION,
1343           session()->GetEncryptionLevelToSendApplicationData());
1344       QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1345                     << " tries to retransmit stream data [" << pending.offset
1346                     << ", " << pending.offset + pending.length
1347                     << ") and fin: " << can_bundle_fin
1348                     << ", consumed: " << consumed;
1349       OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed,
1350                                  consumed.fin_consumed);
1351       if (consumed.bytes_consumed < pending.length ||
1352           (can_bundle_fin && !consumed.fin_consumed)) {
1353         // Connection is write blocked.
1354         return;
1355       }
1356     }
1357   }
1358 }
1359 
MaybeSetTtl(QuicTime::Delta ttl)1360 bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) {
1361   if (is_static_) {
1362     QUIC_BUG(quic_bug_10586_14) << "Cannot set TTL of a static stream.";
1363     return false;
1364   }
1365   if (deadline_.IsInitialized()) {
1366     QUIC_DLOG(WARNING) << "Deadline has already been set.";
1367     return false;
1368   }
1369   QuicTime now = session()->connection()->clock()->ApproximateNow();
1370   deadline_ = now + ttl;
1371   return true;
1372 }
1373 
HasDeadlinePassed() const1374 bool QuicStream::HasDeadlinePassed() const {
1375   if (!deadline_.IsInitialized()) {
1376     // No deadline has been set.
1377     return false;
1378   }
1379   QuicTime now = session()->connection()->clock()->ApproximateNow();
1380   if (now < deadline_) {
1381     return false;
1382   }
1383   // TTL expired.
1384   QUIC_DVLOG(1) << "stream " << id() << " deadline has passed";
1385   return true;
1386 }
1387 
OnDeadlinePassed()1388 void QuicStream::OnDeadlinePassed() { Reset(QUIC_STREAM_TTL_EXPIRED); }
1389 
IsFlowControlBlocked() const1390 bool QuicStream::IsFlowControlBlocked() const {
1391   if (!flow_controller_.has_value()) {
1392     QUIC_BUG(quic_bug_10586_15)
1393         << "Trying to access non-existent flow controller.";
1394     return false;
1395   }
1396   return flow_controller_->IsBlocked();
1397 }
1398 
highest_received_byte_offset() const1399 QuicStreamOffset QuicStream::highest_received_byte_offset() const {
1400   if (!flow_controller_.has_value()) {
1401     QUIC_BUG(quic_bug_10586_16)
1402         << "Trying to access non-existent flow controller.";
1403     return 0;
1404   }
1405   return flow_controller_->highest_received_byte_offset();
1406 }
1407 
UpdateReceiveWindowSize(QuicStreamOffset size)1408 void QuicStream::UpdateReceiveWindowSize(QuicStreamOffset size) {
1409   if (!flow_controller_.has_value()) {
1410     QUIC_BUG(quic_bug_10586_17)
1411         << "Trying to access non-existent flow controller.";
1412     return;
1413   }
1414   flow_controller_->UpdateReceiveWindowSize(size);
1415 }
1416 
GetSendWindow() const1417 std::optional<QuicByteCount> QuicStream::GetSendWindow() const {
1418   return flow_controller_.has_value()
1419              ? std::optional<QuicByteCount>(flow_controller_->SendWindowSize())
1420              : std::nullopt;
1421 }
1422 
GetReceiveWindow() const1423 std::optional<QuicByteCount> QuicStream::GetReceiveWindow() const {
1424   return flow_controller_.has_value()
1425              ? std::optional<QuicByteCount>(
1426                    flow_controller_->receive_window_size())
1427              : std::nullopt;
1428 }
1429 
OnStreamCreatedFromPendingStream()1430 void QuicStream::OnStreamCreatedFromPendingStream() {
1431   sequencer()->SetUnblocked();
1432 }
1433 
CalculateSendWindowSize() const1434 QuicByteCount QuicStream::CalculateSendWindowSize() const {
1435   QuicByteCount send_window;
1436   if (flow_controller_.has_value()) {
1437     send_window = flow_controller_->SendWindowSize();
1438   } else {
1439     send_window = std::numeric_limits<QuicByteCount>::max();
1440   }
1441   if (stream_contributes_to_connection_flow_control_) {
1442     send_window =
1443         std::min(send_window, connection_flow_controller_->SendWindowSize());
1444   }
1445   return send_window;
1446 }
1447 
1448 }  // namespace quic
1449