1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/quic_stream.h"
6
7 #include <memory>
8 #include <optional>
9 #include <string>
10 #include <utility>
11
12 #include "absl/base/macros.h"
13 #include "absl/memory/memory.h"
14 #include "absl/strings/string_view.h"
15 #include "quiche/quic/core/crypto/null_encrypter.h"
16 #include "quiche/quic/core/frames/quic_rst_stream_frame.h"
17 #include "quiche/quic/core/quic_connection.h"
18 #include "quiche/quic/core/quic_constants.h"
19 #include "quiche/quic/core/quic_error_codes.h"
20 #include "quiche/quic/core/quic_types.h"
21 #include "quiche/quic/core/quic_utils.h"
22 #include "quiche/quic/core/quic_versions.h"
23 #include "quiche/quic/core/quic_write_blocked_list.h"
24 #include "quiche/quic/platform/api/quic_expect_bug.h"
25 #include "quiche/quic/platform/api/quic_flags.h"
26 #include "quiche/quic/platform/api/quic_logging.h"
27 #include "quiche/quic/platform/api/quic_test.h"
28 #include "quiche/quic/test_tools/quic_config_peer.h"
29 #include "quiche/quic/test_tools/quic_connection_peer.h"
30 #include "quiche/quic/test_tools/quic_flow_controller_peer.h"
31 #include "quiche/quic/test_tools/quic_session_peer.h"
32 #include "quiche/quic/test_tools/quic_stream_peer.h"
33 #include "quiche/quic/test_tools/quic_stream_sequencer_peer.h"
34 #include "quiche/quic/test_tools/quic_test_utils.h"
35 #include "quiche/common/quiche_mem_slice_storage.h"
36
37 using testing::_;
38 using testing::AnyNumber;
39 using testing::AtLeast;
40 using testing::InSequence;
41 using testing::Invoke;
42 using testing::InvokeWithoutArgs;
43 using testing::Return;
44 using testing::StrictMock;
45
46 namespace quic {
47 namespace test {
48 namespace {
49
50 const char kData1[] = "FooAndBar";
51 const char kData2[] = "EepAndBaz";
52 const QuicByteCount kDataLen = 9;
53 const uint8_t kPacket0ByteConnectionId = 0;
54 const uint8_t kPacket8ByteConnectionId = 8;
55
56 class TestStream : public QuicStream {
57 public:
TestStream(QuicStreamId id,QuicSession * session,StreamType type)58 TestStream(QuicStreamId id, QuicSession* session, StreamType type)
59 : QuicStream(id, session, /*is_static=*/false, type) {
60 sequencer()->set_level_triggered(true);
61 }
62
TestStream(PendingStream * pending,QuicSession * session,bool is_static)63 TestStream(PendingStream* pending, QuicSession* session, bool is_static)
64 : QuicStream(pending, session, is_static) {}
65
66 MOCK_METHOD(void, OnDataAvailable, (), (override));
67
68 MOCK_METHOD(void, OnCanWriteNewData, (), (override));
69
70 MOCK_METHOD(void, OnWriteSideInDataRecvdState, (), (override));
71
72 using QuicStream::CanWriteNewData;
73 using QuicStream::CanWriteNewDataAfterData;
74 using QuicStream::CloseWriteSide;
75 using QuicStream::fin_buffered;
76 using QuicStream::MaybeSendStopSending;
77 using QuicStream::OnClose;
78 using QuicStream::WriteMemSlices;
79 using QuicStream::WriteOrBufferData;
80
81 private:
82 std::string data_;
83 };
84
85 class QuicStreamTest : public QuicTestWithParam<ParsedQuicVersion> {
86 public:
QuicStreamTest()87 QuicStreamTest()
88 : zero_(QuicTime::Delta::Zero()),
89 supported_versions_(AllSupportedVersions()) {}
90
Initialize(Perspective perspective=Perspective::IS_SERVER)91 void Initialize(Perspective perspective = Perspective::IS_SERVER) {
92 ParsedQuicVersionVector version_vector;
93 version_vector.push_back(GetParam());
94 connection_ = new StrictMock<MockQuicConnection>(
95 &helper_, &alarm_factory_, perspective, version_vector);
96 connection_->AdvanceTime(QuicTime::Delta::FromSeconds(1));
97 session_ = std::make_unique<StrictMock<MockQuicSession>>(connection_);
98 session_->Initialize();
99 connection_->SetEncrypter(
100 ENCRYPTION_FORWARD_SECURE,
101 std::make_unique<NullEncrypter>(connection_->perspective()));
102 QuicConfigPeer::SetReceivedInitialSessionFlowControlWindow(
103 session_->config(), kMinimumFlowControlSendWindow);
104 QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesUnidirectional(
105 session_->config(), kMinimumFlowControlSendWindow);
106 QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesIncomingBidirectional(
107 session_->config(), kMinimumFlowControlSendWindow);
108 QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesOutgoingBidirectional(
109 session_->config(), kMinimumFlowControlSendWindow);
110 QuicConfigPeer::SetReceivedMaxUnidirectionalStreams(session_->config(), 10);
111 session_->OnConfigNegotiated();
112
113 stream_ = new StrictMock<TestStream>(kTestStreamId, session_.get(),
114 BIDIRECTIONAL);
115 EXPECT_NE(nullptr, stream_);
116 EXPECT_CALL(*session_, ShouldKeepConnectionAlive())
117 .WillRepeatedly(Return(true));
118 // session_ now owns stream_.
119 session_->ActivateStream(absl::WrapUnique(stream_));
120 // Ignore resetting when session_ is terminated.
121 EXPECT_CALL(*session_, MaybeSendStopSendingFrame(kTestStreamId, _))
122 .Times(AnyNumber());
123 EXPECT_CALL(*session_, MaybeSendRstStreamFrame(kTestStreamId, _, _))
124 .Times(AnyNumber());
125 write_blocked_list_ =
126 QuicSessionPeer::GetWriteBlockedStreams(session_.get());
127 }
128
fin_sent()129 bool fin_sent() { return stream_->fin_sent(); }
rst_sent()130 bool rst_sent() { return stream_->rst_sent(); }
131
HasWriteBlockedStreams()132 bool HasWriteBlockedStreams() {
133 return write_blocked_list_->HasWriteBlockedSpecialStream() ||
134 write_blocked_list_->HasWriteBlockedDataStreams();
135 }
136
CloseStreamOnWriteError(QuicStreamId id,QuicByteCount,QuicStreamOffset,StreamSendingState,TransmissionType,std::optional<EncryptionLevel>)137 QuicConsumedData CloseStreamOnWriteError(
138 QuicStreamId id, QuicByteCount /*write_length*/,
139 QuicStreamOffset /*offset*/, StreamSendingState /*state*/,
140 TransmissionType /*type*/, std::optional<EncryptionLevel> /*level*/) {
141 session_->ResetStream(id, QUIC_STREAM_CANCELLED);
142 return QuicConsumedData(1, false);
143 }
144
ClearResetStreamFrame(const QuicFrame & frame)145 bool ClearResetStreamFrame(const QuicFrame& frame) {
146 EXPECT_EQ(RST_STREAM_FRAME, frame.type);
147 DeleteFrame(&const_cast<QuicFrame&>(frame));
148 return true;
149 }
150
ClearStopSendingFrame(const QuicFrame & frame)151 bool ClearStopSendingFrame(const QuicFrame& frame) {
152 EXPECT_EQ(STOP_SENDING_FRAME, frame.type);
153 DeleteFrame(&const_cast<QuicFrame&>(frame));
154 return true;
155 }
156
157 protected:
158 MockQuicConnectionHelper helper_;
159 MockAlarmFactory alarm_factory_;
160 MockQuicConnection* connection_;
161 std::unique_ptr<MockQuicSession> session_;
162 StrictMock<TestStream>* stream_;
163 QuicWriteBlockedListInterface* write_blocked_list_;
164 QuicTime::Delta zero_;
165 ParsedQuicVersionVector supported_versions_;
166 QuicStreamId kTestStreamId = GetNthClientInitiatedBidirectionalStreamId(
167 GetParam().transport_version, 1);
168 const QuicStreamId kTestPendingStreamId =
169 GetNthClientInitiatedUnidirectionalStreamId(GetParam().transport_version,
170 1);
171 };
172
173 INSTANTIATE_TEST_SUITE_P(QuicStreamTests, QuicStreamTest,
174 ::testing::ValuesIn(AllSupportedVersions()),
175 ::testing::PrintToStringParamName());
176
177 using PendingStreamTest = QuicStreamTest;
178
179 INSTANTIATE_TEST_SUITE_P(PendingStreamTests, PendingStreamTest,
180 ::testing::ValuesIn(CurrentSupportedHttp3Versions()),
181 ::testing::PrintToStringParamName());
182
TEST_P(PendingStreamTest,PendingStreamStaticness)183 TEST_P(PendingStreamTest, PendingStreamStaticness) {
184 Initialize();
185
186 PendingStream pending(kTestPendingStreamId, session_.get());
187 TestStream stream(&pending, session_.get(), false);
188 EXPECT_FALSE(stream.is_static());
189
190 PendingStream pending2(kTestPendingStreamId + 4, session_.get());
191 TestStream stream2(&pending2, session_.get(), true);
192 EXPECT_TRUE(stream2.is_static());
193 }
194
TEST_P(PendingStreamTest,PendingStreamType)195 TEST_P(PendingStreamTest, PendingStreamType) {
196 Initialize();
197
198 PendingStream pending(kTestPendingStreamId, session_.get());
199 TestStream stream(&pending, session_.get(), false);
200 EXPECT_EQ(stream.type(), READ_UNIDIRECTIONAL);
201 }
202
TEST_P(PendingStreamTest,PendingStreamTypeOnClient)203 TEST_P(PendingStreamTest, PendingStreamTypeOnClient) {
204 Initialize(Perspective::IS_CLIENT);
205
206 QuicStreamId server_initiated_pending_stream_id =
207 GetNthServerInitiatedUnidirectionalStreamId(session_->transport_version(),
208 1);
209 PendingStream pending(server_initiated_pending_stream_id, session_.get());
210 TestStream stream(&pending, session_.get(), false);
211 EXPECT_EQ(stream.type(), READ_UNIDIRECTIONAL);
212 }
213
TEST_P(PendingStreamTest,PendingStreamTooMuchData)214 TEST_P(PendingStreamTest, PendingStreamTooMuchData) {
215 Initialize();
216
217 PendingStream pending(kTestPendingStreamId, session_.get());
218 // Receive a stream frame that violates flow control: the byte offset is
219 // higher than the receive window offset.
220 QuicStreamFrame frame(kTestPendingStreamId, false,
221 kInitialSessionFlowControlWindowForTest + 1, ".");
222
223 // Stream should not accept the frame, and the connection should be closed.
224 EXPECT_CALL(*connection_,
225 CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
226 pending.OnStreamFrame(frame);
227 }
228
TEST_P(PendingStreamTest,PendingStreamTooMuchDataInRstStream)229 TEST_P(PendingStreamTest, PendingStreamTooMuchDataInRstStream) {
230 Initialize();
231
232 PendingStream pending1(kTestPendingStreamId, session_.get());
233 // Receive a rst stream frame that violates flow control: the byte offset is
234 // higher than the receive window offset.
235 QuicRstStreamFrame frame1(kInvalidControlFrameId, kTestPendingStreamId,
236 QUIC_STREAM_CANCELLED,
237 kInitialSessionFlowControlWindowForTest + 1);
238
239 // Pending stream should not accept the frame, and the connection should be
240 // closed.
241 EXPECT_CALL(*connection_,
242 CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
243 pending1.OnRstStreamFrame(frame1);
244
245 QuicStreamId bidirection_stream_id = QuicUtils::GetFirstBidirectionalStreamId(
246 session_->transport_version(), Perspective::IS_CLIENT);
247 PendingStream pending2(bidirection_stream_id, session_.get());
248 // Receive a rst stream frame that violates flow control: the byte offset is
249 // higher than the receive window offset.
250 QuicRstStreamFrame frame2(kInvalidControlFrameId, bidirection_stream_id,
251 QUIC_STREAM_CANCELLED,
252 kInitialSessionFlowControlWindowForTest + 1);
253 // Bidirectional Pending stream should not accept the frame, and the
254 // connection should be closed.
255 EXPECT_CALL(*connection_,
256 CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
257 pending2.OnRstStreamFrame(frame2);
258 }
259
TEST_P(PendingStreamTest,PendingStreamRstStream)260 TEST_P(PendingStreamTest, PendingStreamRstStream) {
261 Initialize();
262
263 PendingStream pending(kTestPendingStreamId, session_.get());
264 QuicStreamOffset final_byte_offset = 7;
265 QuicRstStreamFrame frame(kInvalidControlFrameId, kTestPendingStreamId,
266 QUIC_STREAM_CANCELLED, final_byte_offset);
267
268 // Pending stream should accept the frame and not close the connection.
269 EXPECT_CALL(*connection_, CloseConnection(_, _, _)).Times(0);
270 pending.OnRstStreamFrame(frame);
271 }
272
TEST_P(PendingStreamTest,PendingStreamWindowUpdate)273 TEST_P(PendingStreamTest, PendingStreamWindowUpdate) {
274 Initialize();
275
276 QuicStreamId bidirection_stream_id = QuicUtils::GetFirstBidirectionalStreamId(
277 session_->transport_version(), Perspective::IS_CLIENT);
278 PendingStream pending(bidirection_stream_id, session_.get());
279 QuicWindowUpdateFrame frame(kInvalidControlFrameId, bidirection_stream_id,
280 kDefaultFlowControlSendWindow * 2);
281 pending.OnWindowUpdateFrame(frame);
282 TestStream stream(&pending, session_.get(), false);
283
284 EXPECT_EQ(QuicStreamPeer::SendWindowSize(&stream),
285 kDefaultFlowControlSendWindow * 2);
286 }
287
TEST_P(PendingStreamTest,PendingStreamStopSending)288 TEST_P(PendingStreamTest, PendingStreamStopSending) {
289 Initialize();
290
291 QuicStreamId bidirection_stream_id = QuicUtils::GetFirstBidirectionalStreamId(
292 session_->transport_version(), Perspective::IS_CLIENT);
293 PendingStream pending(bidirection_stream_id, session_.get());
294 QuicResetStreamError error =
295 QuicResetStreamError::FromInternal(QUIC_STREAM_INTERNAL_ERROR);
296 pending.OnStopSending(error);
297 EXPECT_TRUE(pending.GetStopSendingErrorCode());
298 auto actual_error = *pending.GetStopSendingErrorCode();
299 EXPECT_EQ(actual_error, error);
300 }
301
TEST_P(PendingStreamTest,FromPendingStream)302 TEST_P(PendingStreamTest, FromPendingStream) {
303 Initialize();
304
305 PendingStream pending(kTestPendingStreamId, session_.get());
306
307 QuicStreamFrame frame(kTestPendingStreamId, false, 2, ".");
308 pending.OnStreamFrame(frame);
309 pending.OnStreamFrame(frame);
310 QuicStreamFrame frame2(kTestPendingStreamId, true, 3, ".");
311 pending.OnStreamFrame(frame2);
312
313 TestStream stream(&pending, session_.get(), false);
314 EXPECT_EQ(3, stream.num_frames_received());
315 EXPECT_EQ(3u, stream.stream_bytes_read());
316 EXPECT_EQ(1, stream.num_duplicate_frames_received());
317 EXPECT_EQ(true, stream.fin_received());
318 EXPECT_EQ(frame2.offset + 1, stream.highest_received_byte_offset());
319 EXPECT_EQ(frame2.offset + 1,
320 session_->flow_controller()->highest_received_byte_offset());
321 }
322
TEST_P(PendingStreamTest,FromPendingStreamThenData)323 TEST_P(PendingStreamTest, FromPendingStreamThenData) {
324 Initialize();
325
326 PendingStream pending(kTestPendingStreamId, session_.get());
327
328 QuicStreamFrame frame(kTestPendingStreamId, false, 2, ".");
329 pending.OnStreamFrame(frame);
330
331 auto stream = new TestStream(&pending, session_.get(), false);
332 session_->ActivateStream(absl::WrapUnique(stream));
333
334 QuicStreamFrame frame2(kTestPendingStreamId, true, 3, ".");
335 stream->OnStreamFrame(frame2);
336
337 EXPECT_EQ(2, stream->num_frames_received());
338 EXPECT_EQ(2u, stream->stream_bytes_read());
339 EXPECT_EQ(true, stream->fin_received());
340 EXPECT_EQ(frame2.offset + 1, stream->highest_received_byte_offset());
341 EXPECT_EQ(frame2.offset + 1,
342 session_->flow_controller()->highest_received_byte_offset());
343 }
344
TEST_P(QuicStreamTest,WriteAllData)345 TEST_P(QuicStreamTest, WriteAllData) {
346 Initialize();
347
348 QuicByteCount length =
349 1 + QuicPacketCreator::StreamFramePacketOverhead(
350 connection_->transport_version(), kPacket8ByteConnectionId,
351 kPacket0ByteConnectionId, !kIncludeVersion,
352 !kIncludeDiversificationNonce, PACKET_4BYTE_PACKET_NUMBER,
353 quiche::VARIABLE_LENGTH_INTEGER_LENGTH_0,
354 quiche::VARIABLE_LENGTH_INTEGER_LENGTH_0, 0u);
355 connection_->SetMaxPacketLength(length);
356
357 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
358 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
359 stream_->WriteOrBufferData(kData1, false, nullptr);
360 EXPECT_FALSE(HasWriteBlockedStreams());
361 }
362
TEST_P(QuicStreamTest,NoBlockingIfNoDataOrFin)363 TEST_P(QuicStreamTest, NoBlockingIfNoDataOrFin) {
364 Initialize();
365
366 // Write no data and no fin. If we consume nothing we should not be write
367 // blocked.
368 EXPECT_QUIC_BUG(
369 stream_->WriteOrBufferData(absl::string_view(), false, nullptr), "");
370 EXPECT_FALSE(HasWriteBlockedStreams());
371 }
372
TEST_P(QuicStreamTest,BlockIfOnlySomeDataConsumed)373 TEST_P(QuicStreamTest, BlockIfOnlySomeDataConsumed) {
374 Initialize();
375
376 // Write some data and no fin. If we consume some but not all of the data,
377 // we should be write blocked a not all the data was consumed.
378 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
379 .WillOnce(InvokeWithoutArgs([this]() {
380 return session_->ConsumeData(stream_->id(), 1u, 0u, NO_FIN,
381 NOT_RETRANSMISSION, std::nullopt);
382 }));
383 stream_->WriteOrBufferData(absl::string_view(kData1, 2), false, nullptr);
384 EXPECT_TRUE(session_->HasUnackedStreamData());
385 ASSERT_EQ(1u, write_blocked_list_->NumBlockedStreams());
386 EXPECT_EQ(1u, stream_->BufferedDataBytes());
387 }
388
TEST_P(QuicStreamTest,BlockIfFinNotConsumedWithData)389 TEST_P(QuicStreamTest, BlockIfFinNotConsumedWithData) {
390 Initialize();
391
392 // Write some data and no fin. If we consume all the data but not the fin,
393 // we should be write blocked because the fin was not consumed.
394 // (This should never actually happen as the fin should be sent out with the
395 // last data)
396 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
397 .WillOnce(InvokeWithoutArgs([this]() {
398 return session_->ConsumeData(stream_->id(), 2u, 0u, NO_FIN,
399 NOT_RETRANSMISSION, std::nullopt);
400 }));
401 stream_->WriteOrBufferData(absl::string_view(kData1, 2), true, nullptr);
402 EXPECT_TRUE(session_->HasUnackedStreamData());
403 ASSERT_EQ(1u, write_blocked_list_->NumBlockedStreams());
404 }
405
TEST_P(QuicStreamTest,BlockIfSoloFinNotConsumed)406 TEST_P(QuicStreamTest, BlockIfSoloFinNotConsumed) {
407 Initialize();
408
409 // Write no data and a fin. If we consume nothing we should be write blocked,
410 // as the fin was not consumed.
411 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
412 .WillOnce(Return(QuicConsumedData(0, false)));
413 stream_->WriteOrBufferData(absl::string_view(), true, nullptr);
414 ASSERT_EQ(1u, write_blocked_list_->NumBlockedStreams());
415 }
416
TEST_P(QuicStreamTest,CloseOnPartialWrite)417 TEST_P(QuicStreamTest, CloseOnPartialWrite) {
418 Initialize();
419
420 // Write some data and no fin. However, while writing the data
421 // close the stream and verify that MarkConnectionLevelWriteBlocked does not
422 // crash with an unknown stream.
423 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
424 .WillOnce(Invoke(this, &QuicStreamTest::CloseStreamOnWriteError));
425 stream_->WriteOrBufferData(absl::string_view(kData1, 2), false, nullptr);
426 ASSERT_EQ(0u, write_blocked_list_->NumBlockedStreams());
427 }
428
TEST_P(QuicStreamTest,WriteOrBufferData)429 TEST_P(QuicStreamTest, WriteOrBufferData) {
430 Initialize();
431
432 EXPECT_FALSE(HasWriteBlockedStreams());
433 QuicByteCount length =
434 1 + QuicPacketCreator::StreamFramePacketOverhead(
435 connection_->transport_version(), kPacket8ByteConnectionId,
436 kPacket0ByteConnectionId, !kIncludeVersion,
437 !kIncludeDiversificationNonce, PACKET_4BYTE_PACKET_NUMBER,
438 quiche::VARIABLE_LENGTH_INTEGER_LENGTH_0,
439 quiche::VARIABLE_LENGTH_INTEGER_LENGTH_0, 0u);
440 connection_->SetMaxPacketLength(length);
441
442 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
443 .WillOnce(InvokeWithoutArgs([this]() {
444 return session_->ConsumeData(stream_->id(), kDataLen - 1, 0u, NO_FIN,
445 NOT_RETRANSMISSION, std::nullopt);
446 }));
447 stream_->WriteOrBufferData(kData1, false, nullptr);
448
449 EXPECT_TRUE(session_->HasUnackedStreamData());
450 EXPECT_EQ(1u, stream_->BufferedDataBytes());
451 EXPECT_TRUE(HasWriteBlockedStreams());
452
453 // Queue a bytes_consumed write.
454 stream_->WriteOrBufferData(kData2, false, nullptr);
455 EXPECT_EQ(10u, stream_->BufferedDataBytes());
456 // Make sure we get the tail of the first write followed by the bytes_consumed
457 InSequence s;
458 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
459 .WillOnce(InvokeWithoutArgs([this]() {
460 return session_->ConsumeData(stream_->id(), kDataLen - 1, kDataLen - 1,
461 NO_FIN, NOT_RETRANSMISSION, std::nullopt);
462 }));
463 EXPECT_CALL(*stream_, OnCanWriteNewData());
464 stream_->OnCanWrite();
465 EXPECT_TRUE(session_->HasUnackedStreamData());
466
467 // And finally the end of the bytes_consumed.
468 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
469 .WillOnce(InvokeWithoutArgs([this]() {
470 return session_->ConsumeData(stream_->id(), 2u, 2 * kDataLen - 2,
471 NO_FIN, NOT_RETRANSMISSION, std::nullopt);
472 }));
473 EXPECT_CALL(*stream_, OnCanWriteNewData());
474 stream_->OnCanWrite();
475 EXPECT_TRUE(session_->HasUnackedStreamData());
476 }
477
TEST_P(QuicStreamTest,WriteOrBufferDataReachStreamLimit)478 TEST_P(QuicStreamTest, WriteOrBufferDataReachStreamLimit) {
479 Initialize();
480 std::string data("aaaaa");
481 QuicStreamPeer::SetStreamBytesWritten(kMaxStreamLength - data.length(),
482 stream_);
483 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
484 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
485 stream_->WriteOrBufferData(data, false, nullptr);
486 EXPECT_TRUE(session_->HasUnackedStreamData());
487 EXPECT_QUIC_BUG(
488 {
489 EXPECT_CALL(*connection_,
490 CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _));
491 stream_->WriteOrBufferData("a", false, nullptr);
492 },
493 "Write too many data via stream");
494 }
495
TEST_P(QuicStreamTest,ConnectionCloseAfterStreamClose)496 TEST_P(QuicStreamTest, ConnectionCloseAfterStreamClose) {
497 Initialize();
498
499 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
500 QUIC_STREAM_CANCELLED, 1234);
501 stream_->OnStreamReset(rst_frame);
502 if (VersionHasIetfQuicFrames(session_->transport_version())) {
503 // Create and inject a STOP SENDING frame to complete the close
504 // of the stream. This is only needed for version 99/IETF QUIC.
505 QuicStopSendingFrame stop_sending(kInvalidControlFrameId, stream_->id(),
506 QUIC_STREAM_CANCELLED);
507 session_->OnStopSendingFrame(stop_sending);
508 }
509 EXPECT_THAT(stream_->stream_error(), IsStreamError(QUIC_STREAM_CANCELLED));
510 EXPECT_THAT(stream_->connection_error(), IsQuicNoError());
511 stream_->OnConnectionClosed(QUIC_INTERNAL_ERROR,
512 ConnectionCloseSource::FROM_SELF);
513 EXPECT_THAT(stream_->stream_error(), IsStreamError(QUIC_STREAM_CANCELLED));
514 EXPECT_THAT(stream_->connection_error(), IsQuicNoError());
515 }
516
TEST_P(QuicStreamTest,RstAlwaysSentIfNoFinSent)517 TEST_P(QuicStreamTest, RstAlwaysSentIfNoFinSent) {
518 // For flow control accounting, a stream must send either a FIN or a RST frame
519 // before termination.
520 // Test that if no FIN has been sent, we send a RST.
521
522 Initialize();
523 EXPECT_FALSE(fin_sent());
524 EXPECT_FALSE(rst_sent());
525
526 // Write some data, with no FIN.
527 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
528 .WillOnce(InvokeWithoutArgs([this]() {
529 return session_->ConsumeData(stream_->id(), 1u, 0u, NO_FIN,
530 NOT_RETRANSMISSION, std::nullopt);
531 }));
532 stream_->WriteOrBufferData(absl::string_view(kData1, 1), false, nullptr);
533 EXPECT_TRUE(session_->HasUnackedStreamData());
534 EXPECT_FALSE(fin_sent());
535 EXPECT_FALSE(rst_sent());
536
537 // Now close the stream, and expect that we send a RST.
538 EXPECT_CALL(*session_, MaybeSendRstStreamFrame(kTestStreamId, _, _));
539 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
540 QUIC_STREAM_CANCELLED, 1234);
541 stream_->OnStreamReset(rst_frame);
542 if (VersionHasIetfQuicFrames(session_->transport_version())) {
543 // Create and inject a STOP SENDING frame to complete the close
544 // of the stream. This is only needed for version 99/IETF QUIC.
545 QuicStopSendingFrame stop_sending(kInvalidControlFrameId, stream_->id(),
546 QUIC_STREAM_CANCELLED);
547 session_->OnStopSendingFrame(stop_sending);
548 }
549 EXPECT_FALSE(session_->HasUnackedStreamData());
550 EXPECT_FALSE(fin_sent());
551 EXPECT_TRUE(rst_sent());
552 }
553
TEST_P(QuicStreamTest,RstNotSentIfFinSent)554 TEST_P(QuicStreamTest, RstNotSentIfFinSent) {
555 // For flow control accounting, a stream must send either a FIN or a RST frame
556 // before termination.
557 // Test that if a FIN has been sent, we don't also send a RST.
558
559 Initialize();
560 EXPECT_FALSE(fin_sent());
561 EXPECT_FALSE(rst_sent());
562
563 // Write some data, with FIN.
564 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
565 .WillOnce(InvokeWithoutArgs([this]() {
566 return session_->ConsumeData(stream_->id(), 1u, 0u, FIN,
567 NOT_RETRANSMISSION, std::nullopt);
568 }));
569 stream_->WriteOrBufferData(absl::string_view(kData1, 1), true, nullptr);
570 EXPECT_TRUE(fin_sent());
571 EXPECT_FALSE(rst_sent());
572
573 // Now close the stream, and expect that we do not send a RST.
574 QuicStreamPeer::CloseReadSide(stream_);
575 stream_->CloseWriteSide();
576 EXPECT_TRUE(fin_sent());
577 EXPECT_FALSE(rst_sent());
578 }
579
TEST_P(QuicStreamTest,OnlySendOneRst)580 TEST_P(QuicStreamTest, OnlySendOneRst) {
581 // For flow control accounting, a stream must send either a FIN or a RST frame
582 // before termination.
583 // Test that if a stream sends a RST, it doesn't send an additional RST during
584 // OnClose() (this shouldn't be harmful, but we shouldn't do it anyway...)
585
586 Initialize();
587 EXPECT_FALSE(fin_sent());
588 EXPECT_FALSE(rst_sent());
589
590 // Reset the stream.
591 EXPECT_CALL(*session_, MaybeSendRstStreamFrame(kTestStreamId, _, _)).Times(1);
592 stream_->Reset(QUIC_STREAM_CANCELLED);
593 EXPECT_FALSE(fin_sent());
594 EXPECT_TRUE(rst_sent());
595
596 // Now close the stream (any further resets being sent would break the
597 // expectation above).
598 QuicStreamPeer::CloseReadSide(stream_);
599 stream_->CloseWriteSide();
600 EXPECT_FALSE(fin_sent());
601 EXPECT_TRUE(rst_sent());
602 }
603
TEST_P(QuicStreamTest,StreamFlowControlMultipleWindowUpdates)604 TEST_P(QuicStreamTest, StreamFlowControlMultipleWindowUpdates) {
605 Initialize();
606
607 // If we receive multiple WINDOW_UPDATES (potentially out of order), then we
608 // want to make sure we latch the largest offset we see.
609
610 // Initially should be default.
611 EXPECT_EQ(kMinimumFlowControlSendWindow,
612 QuicStreamPeer::SendWindowOffset(stream_));
613
614 // Check a single WINDOW_UPDATE results in correct offset.
615 QuicWindowUpdateFrame window_update_1(kInvalidControlFrameId, stream_->id(),
616 kMinimumFlowControlSendWindow + 5);
617 stream_->OnWindowUpdateFrame(window_update_1);
618 EXPECT_EQ(window_update_1.max_data,
619 QuicStreamPeer::SendWindowOffset(stream_));
620
621 // Now send a few more WINDOW_UPDATES and make sure that only the largest is
622 // remembered.
623 QuicWindowUpdateFrame window_update_2(kInvalidControlFrameId, stream_->id(),
624 1);
625 QuicWindowUpdateFrame window_update_3(kInvalidControlFrameId, stream_->id(),
626 kMinimumFlowControlSendWindow + 10);
627 QuicWindowUpdateFrame window_update_4(kInvalidControlFrameId, stream_->id(),
628 5678);
629 stream_->OnWindowUpdateFrame(window_update_2);
630 stream_->OnWindowUpdateFrame(window_update_3);
631 stream_->OnWindowUpdateFrame(window_update_4);
632 EXPECT_EQ(window_update_3.max_data,
633 QuicStreamPeer::SendWindowOffset(stream_));
634 }
635
TEST_P(QuicStreamTest,FrameStats)636 TEST_P(QuicStreamTest, FrameStats) {
637 Initialize();
638
639 EXPECT_EQ(0, stream_->num_frames_received());
640 EXPECT_EQ(0, stream_->num_duplicate_frames_received());
641 QuicStreamFrame frame(stream_->id(), false, 0, ".");
642 EXPECT_CALL(*stream_, OnDataAvailable()).Times(2);
643 stream_->OnStreamFrame(frame);
644 EXPECT_EQ(1, stream_->num_frames_received());
645 EXPECT_EQ(0, stream_->num_duplicate_frames_received());
646 stream_->OnStreamFrame(frame);
647 EXPECT_EQ(2, stream_->num_frames_received());
648 EXPECT_EQ(1, stream_->num_duplicate_frames_received());
649 QuicStreamFrame frame2(stream_->id(), false, 1, "abc");
650 stream_->OnStreamFrame(frame2);
651 }
652
653 // Verify that when we receive a packet which violates flow control (i.e. sends
654 // too much data on the stream) that the stream sequencer never sees this frame,
655 // as we check for violation and close the connection early.
TEST_P(QuicStreamTest,StreamSequencerNeverSeesPacketsViolatingFlowControl)656 TEST_P(QuicStreamTest, StreamSequencerNeverSeesPacketsViolatingFlowControl) {
657 Initialize();
658
659 // Receive a stream frame that violates flow control: the byte offset is
660 // higher than the receive window offset.
661 QuicStreamFrame frame(stream_->id(), false,
662 kInitialSessionFlowControlWindowForTest + 1, ".");
663 EXPECT_GT(frame.offset, QuicStreamPeer::ReceiveWindowOffset(stream_));
664
665 // Stream should not accept the frame, and the connection should be closed.
666 EXPECT_CALL(*connection_,
667 CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
668 stream_->OnStreamFrame(frame);
669 }
670
671 // Verify that after the consumer calls StopReading(), the stream still sends
672 // flow control updates.
TEST_P(QuicStreamTest,StopReadingSendsFlowControl)673 TEST_P(QuicStreamTest, StopReadingSendsFlowControl) {
674 Initialize();
675
676 stream_->StopReading();
677
678 // Connection should not get terminated due to flow control errors.
679 EXPECT_CALL(*connection_,
680 CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _))
681 .Times(0);
682 EXPECT_CALL(*session_, WriteControlFrame(_, _))
683 .Times(AtLeast(1))
684 .WillRepeatedly(Invoke(&ClearControlFrameWithTransmissionType));
685
686 std::string data(1000, 'x');
687 for (QuicStreamOffset offset = 0;
688 offset < 2 * kInitialStreamFlowControlWindowForTest;
689 offset += data.length()) {
690 QuicStreamFrame frame(stream_->id(), false, offset, data);
691 stream_->OnStreamFrame(frame);
692 }
693 EXPECT_LT(kInitialStreamFlowControlWindowForTest,
694 QuicStreamPeer::ReceiveWindowOffset(stream_));
695 }
696
TEST_P(QuicStreamTest,FinalByteOffsetFromFin)697 TEST_P(QuicStreamTest, FinalByteOffsetFromFin) {
698 Initialize();
699
700 EXPECT_FALSE(stream_->HasReceivedFinalOffset());
701
702 QuicStreamFrame stream_frame_no_fin(stream_->id(), false, 1234, ".");
703 stream_->OnStreamFrame(stream_frame_no_fin);
704 EXPECT_FALSE(stream_->HasReceivedFinalOffset());
705
706 QuicStreamFrame stream_frame_with_fin(stream_->id(), true, 1234, ".");
707 stream_->OnStreamFrame(stream_frame_with_fin);
708 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
709 }
710
TEST_P(QuicStreamTest,FinalByteOffsetFromRst)711 TEST_P(QuicStreamTest, FinalByteOffsetFromRst) {
712 Initialize();
713
714 EXPECT_FALSE(stream_->HasReceivedFinalOffset());
715 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
716 QUIC_STREAM_CANCELLED, 1234);
717 stream_->OnStreamReset(rst_frame);
718 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
719 }
720
TEST_P(QuicStreamTest,InvalidFinalByteOffsetFromRst)721 TEST_P(QuicStreamTest, InvalidFinalByteOffsetFromRst) {
722 Initialize();
723
724 EXPECT_FALSE(stream_->HasReceivedFinalOffset());
725 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
726 QUIC_STREAM_CANCELLED, 0xFFFFFFFFFFFF);
727 // Stream should not accept the frame, and the connection should be closed.
728 EXPECT_CALL(*connection_,
729 CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
730 stream_->OnStreamReset(rst_frame);
731 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
732 }
733
TEST_P(QuicStreamTest,FinalByteOffsetFromZeroLengthStreamFrame)734 TEST_P(QuicStreamTest, FinalByteOffsetFromZeroLengthStreamFrame) {
735 // When receiving Trailers, an empty stream frame is created with the FIN set,
736 // and is passed to OnStreamFrame. The Trailers may be sent in advance of
737 // queued body bytes being sent, and thus the final byte offset may exceed
738 // current flow control limits. Flow control should only be concerned with
739 // data that has actually been sent/received, so verify that flow control
740 // ignores such a stream frame.
741 Initialize();
742
743 EXPECT_FALSE(stream_->HasReceivedFinalOffset());
744 const QuicStreamOffset kByteOffsetExceedingFlowControlWindow =
745 kInitialSessionFlowControlWindowForTest + 1;
746 const QuicStreamOffset current_stream_flow_control_offset =
747 QuicStreamPeer::ReceiveWindowOffset(stream_);
748 const QuicStreamOffset current_connection_flow_control_offset =
749 QuicFlowControllerPeer::ReceiveWindowOffset(session_->flow_controller());
750 ASSERT_GT(kByteOffsetExceedingFlowControlWindow,
751 current_stream_flow_control_offset);
752 ASSERT_GT(kByteOffsetExceedingFlowControlWindow,
753 current_connection_flow_control_offset);
754 QuicStreamFrame zero_length_stream_frame_with_fin(
755 stream_->id(), /*fin=*/true, kByteOffsetExceedingFlowControlWindow,
756 absl::string_view());
757 EXPECT_EQ(0, zero_length_stream_frame_with_fin.data_length);
758
759 EXPECT_CALL(*connection_, CloseConnection(_, _, _)).Times(0);
760 stream_->OnStreamFrame(zero_length_stream_frame_with_fin);
761 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
762
763 // The flow control receive offset values should not have changed.
764 EXPECT_EQ(current_stream_flow_control_offset,
765 QuicStreamPeer::ReceiveWindowOffset(stream_));
766 EXPECT_EQ(
767 current_connection_flow_control_offset,
768 QuicFlowControllerPeer::ReceiveWindowOffset(session_->flow_controller()));
769 }
770
TEST_P(QuicStreamTest,OnStreamResetOffsetOverflow)771 TEST_P(QuicStreamTest, OnStreamResetOffsetOverflow) {
772 Initialize();
773 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
774 QUIC_STREAM_CANCELLED, kMaxStreamLength + 1);
775 EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _));
776 stream_->OnStreamReset(rst_frame);
777 }
778
TEST_P(QuicStreamTest,OnStreamFrameUpperLimit)779 TEST_P(QuicStreamTest, OnStreamFrameUpperLimit) {
780 Initialize();
781
782 // Modify receive window offset and sequencer buffer total_bytes_read_ to
783 // avoid flow control violation.
784 QuicStreamPeer::SetReceiveWindowOffset(stream_, kMaxStreamLength + 5u);
785 QuicFlowControllerPeer::SetReceiveWindowOffset(session_->flow_controller(),
786 kMaxStreamLength + 5u);
787 QuicStreamSequencerPeer::SetFrameBufferTotalBytesRead(
788 QuicStreamPeer::sequencer(stream_), kMaxStreamLength - 10u);
789
790 EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _))
791 .Times(0);
792 QuicStreamFrame stream_frame(stream_->id(), false, kMaxStreamLength - 1, ".");
793 stream_->OnStreamFrame(stream_frame);
794 QuicStreamFrame stream_frame2(stream_->id(), true, kMaxStreamLength, "");
795 stream_->OnStreamFrame(stream_frame2);
796 }
797
TEST_P(QuicStreamTest,StreamTooLong)798 TEST_P(QuicStreamTest, StreamTooLong) {
799 Initialize();
800 QuicStreamFrame stream_frame(stream_->id(), false, kMaxStreamLength, ".");
801 EXPECT_QUIC_PEER_BUG(
802 {
803 EXPECT_CALL(*connection_,
804 CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _))
805 .Times(1);
806 stream_->OnStreamFrame(stream_frame);
807 },
808 absl::StrCat("Receive stream frame on stream ", stream_->id(),
809 " reaches max stream length"));
810 }
811
TEST_P(QuicStreamTest,SetDrainingIncomingOutgoing)812 TEST_P(QuicStreamTest, SetDrainingIncomingOutgoing) {
813 // Don't have incoming data consumed.
814 Initialize();
815
816 // Incoming data with FIN.
817 QuicStreamFrame stream_frame_with_fin(stream_->id(), true, 1234, ".");
818 stream_->OnStreamFrame(stream_frame_with_fin);
819 // The FIN has been received but not consumed.
820 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
821 EXPECT_FALSE(QuicStreamPeer::read_side_closed(stream_));
822 EXPECT_FALSE(stream_->reading_stopped());
823
824 EXPECT_EQ(1u, QuicSessionPeer::GetNumOpenDynamicStreams(session_.get()));
825
826 // Outgoing data with FIN.
827 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
828 .WillOnce(InvokeWithoutArgs([this]() {
829 return session_->ConsumeData(stream_->id(), 2u, 0u, FIN,
830 NOT_RETRANSMISSION, std::nullopt);
831 }));
832 stream_->WriteOrBufferData(absl::string_view(kData1, 2), true, nullptr);
833 EXPECT_TRUE(stream_->write_side_closed());
834
835 EXPECT_EQ(1u, QuicSessionPeer::GetNumDrainingStreams(session_.get()));
836 EXPECT_EQ(0u, QuicSessionPeer::GetNumOpenDynamicStreams(session_.get()));
837 }
838
TEST_P(QuicStreamTest,SetDrainingOutgoingIncoming)839 TEST_P(QuicStreamTest, SetDrainingOutgoingIncoming) {
840 // Don't have incoming data consumed.
841 Initialize();
842
843 // Outgoing data with FIN.
844 EXPECT_CALL(*session_, WritevData(kTestStreamId, _, _, _, _, _))
845 .WillOnce(InvokeWithoutArgs([this]() {
846 return session_->ConsumeData(stream_->id(), 2u, 0u, FIN,
847 NOT_RETRANSMISSION, std::nullopt);
848 }));
849 stream_->WriteOrBufferData(absl::string_view(kData1, 2), true, nullptr);
850 EXPECT_TRUE(stream_->write_side_closed());
851
852 EXPECT_EQ(1u, QuicSessionPeer::GetNumOpenDynamicStreams(session_.get()));
853
854 // Incoming data with FIN.
855 QuicStreamFrame stream_frame_with_fin(stream_->id(), true, 1234, ".");
856 stream_->OnStreamFrame(stream_frame_with_fin);
857 // The FIN has been received but not consumed.
858 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
859 EXPECT_FALSE(QuicStreamPeer::read_side_closed(stream_));
860 EXPECT_FALSE(stream_->reading_stopped());
861
862 EXPECT_EQ(1u, QuicSessionPeer::GetNumDrainingStreams(session_.get()));
863 EXPECT_EQ(0u, QuicSessionPeer::GetNumOpenDynamicStreams(session_.get()));
864 }
865
TEST_P(QuicStreamTest,EarlyResponseFinHandling)866 TEST_P(QuicStreamTest, EarlyResponseFinHandling) {
867 // Verify that if the server completes the response before reading the end of
868 // the request, the received FIN is recorded.
869
870 Initialize();
871 EXPECT_CALL(*connection_, CloseConnection(_, _, _)).Times(0);
872 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
873 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
874
875 // Receive data for the request.
876 EXPECT_CALL(*stream_, OnDataAvailable()).Times(1);
877 QuicStreamFrame frame1(stream_->id(), false, 0, "Start");
878 stream_->OnStreamFrame(frame1);
879 // When QuicSimpleServerStream sends the response, it calls
880 // QuicStream::CloseReadSide() first.
881 QuicStreamPeer::CloseReadSide(stream_);
882 // Send data and FIN for the response.
883 stream_->WriteOrBufferData(kData1, false, nullptr);
884 EXPECT_TRUE(QuicStreamPeer::read_side_closed(stream_));
885 // Receive remaining data and FIN for the request.
886 QuicStreamFrame frame2(stream_->id(), true, 0, "End");
887 stream_->OnStreamFrame(frame2);
888 EXPECT_TRUE(stream_->fin_received());
889 EXPECT_TRUE(stream_->HasReceivedFinalOffset());
890 }
891
TEST_P(QuicStreamTest,StreamWaitsForAcks)892 TEST_P(QuicStreamTest, StreamWaitsForAcks) {
893 Initialize();
894 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
895 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
896 // Stream is not waiting for acks initially.
897 EXPECT_FALSE(stream_->IsWaitingForAcks());
898 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
899 EXPECT_FALSE(session_->HasUnackedStreamData());
900
901 // Send kData1.
902 stream_->WriteOrBufferData(kData1, false, nullptr);
903 EXPECT_TRUE(session_->HasUnackedStreamData());
904 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
905 EXPECT_TRUE(stream_->IsWaitingForAcks());
906 QuicByteCount newly_acked_length = 0;
907 EXPECT_TRUE(stream_->OnStreamFrameAcked(0, 9, false, QuicTime::Delta::Zero(),
908 QuicTime::Zero(),
909 &newly_acked_length));
910 EXPECT_EQ(9u, newly_acked_length);
911 // Stream is not waiting for acks as all sent data is acked.
912 EXPECT_FALSE(stream_->IsWaitingForAcks());
913 EXPECT_FALSE(session_->HasUnackedStreamData());
914 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
915
916 // Send kData2.
917 stream_->WriteOrBufferData(kData2, false, nullptr);
918 EXPECT_TRUE(stream_->IsWaitingForAcks());
919 EXPECT_TRUE(session_->HasUnackedStreamData());
920 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
921 // Send FIN.
922 stream_->WriteOrBufferData("", true, nullptr);
923 // Fin only frame is not stored in send buffer.
924 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
925
926 // kData2 is retransmitted.
927 stream_->OnStreamFrameRetransmitted(9, 9, false);
928
929 // kData2 is acked.
930 EXPECT_TRUE(stream_->OnStreamFrameAcked(9, 9, false, QuicTime::Delta::Zero(),
931 QuicTime::Zero(),
932 &newly_acked_length));
933 EXPECT_EQ(9u, newly_acked_length);
934 // Stream is waiting for acks as FIN is not acked.
935 EXPECT_TRUE(stream_->IsWaitingForAcks());
936 EXPECT_TRUE(session_->HasUnackedStreamData());
937 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
938
939 // FIN is acked.
940 EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState());
941 EXPECT_TRUE(stream_->OnStreamFrameAcked(18, 0, true, QuicTime::Delta::Zero(),
942 QuicTime::Zero(),
943 &newly_acked_length));
944 EXPECT_EQ(0u, newly_acked_length);
945 EXPECT_FALSE(stream_->IsWaitingForAcks());
946 EXPECT_FALSE(session_->HasUnackedStreamData());
947 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
948 }
949
TEST_P(QuicStreamTest,StreamDataGetAckedOutOfOrder)950 TEST_P(QuicStreamTest, StreamDataGetAckedOutOfOrder) {
951 Initialize();
952 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
953 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
954 // Send data.
955 stream_->WriteOrBufferData(kData1, false, nullptr);
956 stream_->WriteOrBufferData(kData1, false, nullptr);
957 stream_->WriteOrBufferData(kData1, false, nullptr);
958 stream_->WriteOrBufferData("", true, nullptr);
959 EXPECT_EQ(3u, QuicStreamPeer::SendBuffer(stream_).size());
960 EXPECT_TRUE(stream_->IsWaitingForAcks());
961 EXPECT_TRUE(session_->HasUnackedStreamData());
962 QuicByteCount newly_acked_length = 0;
963 EXPECT_TRUE(stream_->OnStreamFrameAcked(9, 9, false, QuicTime::Delta::Zero(),
964 QuicTime::Zero(),
965 &newly_acked_length));
966 EXPECT_TRUE(session_->HasUnackedStreamData());
967 EXPECT_EQ(9u, newly_acked_length);
968 EXPECT_EQ(3u, QuicStreamPeer::SendBuffer(stream_).size());
969 EXPECT_TRUE(stream_->OnStreamFrameAcked(18, 9, false, QuicTime::Delta::Zero(),
970 QuicTime::Zero(),
971 &newly_acked_length));
972 EXPECT_TRUE(session_->HasUnackedStreamData());
973 EXPECT_EQ(9u, newly_acked_length);
974 EXPECT_EQ(3u, QuicStreamPeer::SendBuffer(stream_).size());
975 EXPECT_TRUE(stream_->OnStreamFrameAcked(0, 9, false, QuicTime::Delta::Zero(),
976 QuicTime::Zero(),
977 &newly_acked_length));
978 EXPECT_TRUE(session_->HasUnackedStreamData());
979 EXPECT_EQ(9u, newly_acked_length);
980 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
981 // FIN is not acked yet.
982 EXPECT_TRUE(stream_->IsWaitingForAcks());
983 EXPECT_TRUE(session_->HasUnackedStreamData());
984 EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState());
985 EXPECT_TRUE(stream_->OnStreamFrameAcked(27, 0, true, QuicTime::Delta::Zero(),
986 QuicTime::Zero(),
987 &newly_acked_length));
988 EXPECT_EQ(0u, newly_acked_length);
989 EXPECT_FALSE(stream_->IsWaitingForAcks());
990 EXPECT_FALSE(session_->HasUnackedStreamData());
991 }
992
TEST_P(QuicStreamTest,CancelStream)993 TEST_P(QuicStreamTest, CancelStream) {
994 Initialize();
995 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
996 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
997 EXPECT_FALSE(stream_->IsWaitingForAcks());
998 EXPECT_FALSE(session_->HasUnackedStreamData());
999 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1000
1001 stream_->WriteOrBufferData(kData1, false, nullptr);
1002 EXPECT_TRUE(stream_->IsWaitingForAcks());
1003 EXPECT_TRUE(session_->HasUnackedStreamData());
1004 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1005 // Cancel stream.
1006 stream_->MaybeSendStopSending(QUIC_STREAM_NO_ERROR);
1007 // stream still waits for acks as the error code is QUIC_STREAM_NO_ERROR, and
1008 // data is going to be retransmitted.
1009 EXPECT_TRUE(stream_->IsWaitingForAcks());
1010 EXPECT_TRUE(session_->HasUnackedStreamData());
1011 EXPECT_CALL(*connection_,
1012 OnStreamReset(stream_->id(), QUIC_STREAM_CANCELLED));
1013 EXPECT_CALL(*session_, WriteControlFrame(_, _))
1014 .Times(AtLeast(1))
1015 .WillRepeatedly(Invoke(&ClearControlFrameWithTransmissionType));
1016
1017 EXPECT_CALL(*session_, MaybeSendRstStreamFrame(_, _, _))
1018 .WillOnce(InvokeWithoutArgs([this]() {
1019 session_->ReallyMaybeSendRstStreamFrame(
1020 stream_->id(), QUIC_STREAM_CANCELLED,
1021 stream_->stream_bytes_written());
1022 }));
1023
1024 stream_->Reset(QUIC_STREAM_CANCELLED);
1025 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1026 // Stream stops waiting for acks as data is not going to be retransmitted.
1027 EXPECT_FALSE(stream_->IsWaitingForAcks());
1028 EXPECT_FALSE(session_->HasUnackedStreamData());
1029 }
1030
TEST_P(QuicStreamTest,RstFrameReceivedStreamNotFinishSending)1031 TEST_P(QuicStreamTest, RstFrameReceivedStreamNotFinishSending) {
1032 if (VersionHasIetfQuicFrames(GetParam().transport_version)) {
1033 // In IETF QUIC, receiving a RESET_STREAM will only close the read side. The
1034 // stream itself is not closed and will not send reset.
1035 return;
1036 }
1037
1038 Initialize();
1039 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1040 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1041 EXPECT_FALSE(stream_->IsWaitingForAcks());
1042 EXPECT_FALSE(session_->HasUnackedStreamData());
1043 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1044
1045 stream_->WriteOrBufferData(kData1, false, nullptr);
1046 EXPECT_TRUE(stream_->IsWaitingForAcks());
1047 EXPECT_TRUE(session_->HasUnackedStreamData());
1048 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1049
1050 // RST_STREAM received.
1051 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
1052 QUIC_STREAM_CANCELLED, 9);
1053
1054 EXPECT_CALL(
1055 *session_,
1056 MaybeSendRstStreamFrame(
1057 stream_->id(),
1058 QuicResetStreamError::FromInternal(QUIC_RST_ACKNOWLEDGEMENT), 9));
1059 stream_->OnStreamReset(rst_frame);
1060 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1061 // Stream stops waiting for acks as it does not finish sending and rst is
1062 // sent.
1063 EXPECT_FALSE(stream_->IsWaitingForAcks());
1064 EXPECT_FALSE(session_->HasUnackedStreamData());
1065 }
1066
TEST_P(QuicStreamTest,RstFrameReceivedStreamFinishSending)1067 TEST_P(QuicStreamTest, RstFrameReceivedStreamFinishSending) {
1068 Initialize();
1069 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1070 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1071 EXPECT_FALSE(stream_->IsWaitingForAcks());
1072 EXPECT_FALSE(session_->HasUnackedStreamData());
1073 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1074
1075 stream_->WriteOrBufferData(kData1, true, nullptr);
1076 EXPECT_TRUE(stream_->IsWaitingForAcks());
1077 EXPECT_TRUE(session_->HasUnackedStreamData());
1078
1079 // RST_STREAM received.
1080 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
1081 QUIC_STREAM_CANCELLED, 1234);
1082 stream_->OnStreamReset(rst_frame);
1083 // Stream still waits for acks as it finishes sending and has unacked data.
1084 EXPECT_TRUE(stream_->IsWaitingForAcks());
1085 EXPECT_TRUE(session_->HasUnackedStreamData());
1086 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1087 }
1088
TEST_P(QuicStreamTest,ConnectionClosed)1089 TEST_P(QuicStreamTest, ConnectionClosed) {
1090 Initialize();
1091 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1092 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1093 EXPECT_FALSE(stream_->IsWaitingForAcks());
1094 EXPECT_FALSE(session_->HasUnackedStreamData());
1095 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1096
1097 stream_->WriteOrBufferData(kData1, false, nullptr);
1098 EXPECT_TRUE(stream_->IsWaitingForAcks());
1099 EXPECT_TRUE(session_->HasUnackedStreamData());
1100 EXPECT_CALL(
1101 *session_,
1102 MaybeSendRstStreamFrame(
1103 stream_->id(),
1104 QuicResetStreamError::FromInternal(QUIC_RST_ACKNOWLEDGEMENT), 9));
1105 QuicConnectionPeer::SetConnectionClose(connection_);
1106 stream_->OnConnectionClosed(QUIC_INTERNAL_ERROR,
1107 ConnectionCloseSource::FROM_SELF);
1108 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1109 // Stream stops waiting for acks as connection is going to close.
1110 EXPECT_FALSE(stream_->IsWaitingForAcks());
1111 EXPECT_FALSE(session_->HasUnackedStreamData());
1112 }
1113
TEST_P(QuicStreamTest,CanWriteNewDataAfterData)1114 TEST_P(QuicStreamTest, CanWriteNewDataAfterData) {
1115 SetQuicFlag(quic_buffered_data_threshold, 100);
1116 Initialize();
1117 EXPECT_TRUE(stream_->CanWriteNewDataAfterData(99));
1118 EXPECT_FALSE(stream_->CanWriteNewDataAfterData(100));
1119 }
1120
TEST_P(QuicStreamTest,WriteBufferedData)1121 TEST_P(QuicStreamTest, WriteBufferedData) {
1122 // Set buffered data low water mark to be 100.
1123 SetQuicFlag(quic_buffered_data_threshold, 100);
1124
1125 Initialize();
1126 std::string data(1024, 'a');
1127 EXPECT_TRUE(stream_->CanWriteNewData());
1128
1129 // Testing WriteOrBufferData.
1130 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1131 .WillOnce(InvokeWithoutArgs([this]() {
1132 return session_->ConsumeData(stream_->id(), 100u, 0u, NO_FIN,
1133 NOT_RETRANSMISSION, std::nullopt);
1134 }));
1135 stream_->WriteOrBufferData(data, false, nullptr);
1136 stream_->WriteOrBufferData(data, false, nullptr);
1137 stream_->WriteOrBufferData(data, false, nullptr);
1138 EXPECT_TRUE(stream_->IsWaitingForAcks());
1139
1140 // Verify all data is saved.
1141 EXPECT_EQ(3 * data.length() - 100, stream_->BufferedDataBytes());
1142
1143 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1144 .WillOnce(InvokeWithoutArgs([this]() {
1145 return session_->ConsumeData(stream_->id(), 100, 100u, NO_FIN,
1146 NOT_RETRANSMISSION, std::nullopt);
1147 }));
1148 // Buffered data size > threshold, do not ask upper layer for more data.
1149 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(0);
1150 stream_->OnCanWrite();
1151 EXPECT_EQ(3 * data.length() - 200, stream_->BufferedDataBytes());
1152 EXPECT_FALSE(stream_->CanWriteNewData());
1153
1154 // Send buffered data to make buffered data size < threshold.
1155 QuicByteCount data_to_write =
1156 3 * data.length() - 200 - GetQuicFlag(quic_buffered_data_threshold) + 1;
1157 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1158 .WillOnce(InvokeWithoutArgs([this, data_to_write]() {
1159 return session_->ConsumeData(stream_->id(), data_to_write, 200u, NO_FIN,
1160 NOT_RETRANSMISSION, std::nullopt);
1161 }));
1162 // Buffered data size < threshold, ask upper layer for more data.
1163 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(1);
1164 stream_->OnCanWrite();
1165 EXPECT_EQ(
1166 static_cast<uint64_t>(GetQuicFlag(quic_buffered_data_threshold) - 1),
1167 stream_->BufferedDataBytes());
1168 EXPECT_TRUE(stream_->CanWriteNewData());
1169
1170 // Flush all buffered data.
1171 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1172 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1173 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(1);
1174 stream_->OnCanWrite();
1175 EXPECT_EQ(0u, stream_->BufferedDataBytes());
1176 EXPECT_FALSE(stream_->HasBufferedData());
1177 EXPECT_TRUE(stream_->CanWriteNewData());
1178
1179 // Testing Writev.
1180 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1181 .WillOnce(Return(QuicConsumedData(0, false)));
1182 struct iovec iov = {const_cast<char*>(data.data()), data.length()};
1183 quiche::QuicheMemSliceStorage storage(
1184 &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
1185 1024);
1186 QuicConsumedData consumed = stream_->WriteMemSlices(storage.ToSpan(), false);
1187
1188 // There is no buffered data before, all data should be consumed without
1189 // respecting buffered data upper limit.
1190 EXPECT_EQ(data.length(), consumed.bytes_consumed);
1191 EXPECT_FALSE(consumed.fin_consumed);
1192 EXPECT_EQ(data.length(), stream_->BufferedDataBytes());
1193 EXPECT_FALSE(stream_->CanWriteNewData());
1194
1195 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
1196 quiche::QuicheMemSliceStorage storage2(
1197 &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
1198 1024);
1199 consumed = stream_->WriteMemSlices(storage2.ToSpan(), false);
1200 // No Data can be consumed as buffered data is beyond upper limit.
1201 EXPECT_EQ(0u, consumed.bytes_consumed);
1202 EXPECT_FALSE(consumed.fin_consumed);
1203 EXPECT_EQ(data.length(), stream_->BufferedDataBytes());
1204
1205 data_to_write = data.length() - GetQuicFlag(quic_buffered_data_threshold) + 1;
1206 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1207 .WillOnce(InvokeWithoutArgs([this, data_to_write]() {
1208 return session_->ConsumeData(stream_->id(), data_to_write, 0u, NO_FIN,
1209 NOT_RETRANSMISSION, std::nullopt);
1210 }));
1211
1212 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(1);
1213 stream_->OnCanWrite();
1214 EXPECT_EQ(
1215 static_cast<uint64_t>(GetQuicFlag(quic_buffered_data_threshold) - 1),
1216 stream_->BufferedDataBytes());
1217 EXPECT_TRUE(stream_->CanWriteNewData());
1218
1219 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
1220 // All data can be consumed as buffered data is below upper limit.
1221 quiche::QuicheMemSliceStorage storage3(
1222 &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
1223 1024);
1224 consumed = stream_->WriteMemSlices(storage3.ToSpan(), false);
1225 EXPECT_EQ(data.length(), consumed.bytes_consumed);
1226 EXPECT_FALSE(consumed.fin_consumed);
1227 EXPECT_EQ(data.length() + GetQuicFlag(quic_buffered_data_threshold) - 1,
1228 stream_->BufferedDataBytes());
1229 EXPECT_FALSE(stream_->CanWriteNewData());
1230 }
1231
TEST_P(QuicStreamTest,WritevDataReachStreamLimit)1232 TEST_P(QuicStreamTest, WritevDataReachStreamLimit) {
1233 Initialize();
1234 std::string data("aaaaa");
1235 QuicStreamPeer::SetStreamBytesWritten(kMaxStreamLength - data.length(),
1236 stream_);
1237 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1238 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1239 struct iovec iov = {const_cast<char*>(data.data()), 5u};
1240 quiche::QuicheMemSliceStorage storage(
1241 &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
1242 1024);
1243 QuicConsumedData consumed = stream_->WriteMemSlices(storage.ToSpan(), false);
1244 EXPECT_EQ(data.length(), consumed.bytes_consumed);
1245 struct iovec iov2 = {const_cast<char*>(data.data()), 1u};
1246 quiche::QuicheMemSliceStorage storage2(
1247 &iov2, 1,
1248 session_->connection()->helper()->GetStreamSendBufferAllocator(), 1024);
1249 EXPECT_QUIC_BUG(
1250 {
1251 EXPECT_CALL(*connection_,
1252 CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _));
1253 stream_->WriteMemSlices(storage2.ToSpan(), false);
1254 },
1255 "Write too many data via stream");
1256 }
1257
TEST_P(QuicStreamTest,WriteMemSlices)1258 TEST_P(QuicStreamTest, WriteMemSlices) {
1259 // Set buffered data low water mark to be 100.
1260 SetQuicFlag(quic_buffered_data_threshold, 100);
1261
1262 Initialize();
1263 constexpr QuicByteCount kDataSize = 1024;
1264 quiche::QuicheBufferAllocator* allocator =
1265 connection_->helper()->GetStreamSendBufferAllocator();
1266 std::vector<quiche::QuicheMemSlice> vector1;
1267 vector1.push_back(
1268 quiche::QuicheMemSlice(quiche::QuicheBuffer(allocator, kDataSize)));
1269 vector1.push_back(
1270 quiche::QuicheMemSlice(quiche::QuicheBuffer(allocator, kDataSize)));
1271 std::vector<quiche::QuicheMemSlice> vector2;
1272 vector2.push_back(
1273 quiche::QuicheMemSlice(quiche::QuicheBuffer(allocator, kDataSize)));
1274 vector2.push_back(
1275 quiche::QuicheMemSlice(quiche::QuicheBuffer(allocator, kDataSize)));
1276 absl::Span<quiche::QuicheMemSlice> span1(vector1);
1277 absl::Span<quiche::QuicheMemSlice> span2(vector2);
1278
1279 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1280 .WillOnce(InvokeWithoutArgs([this]() {
1281 return session_->ConsumeData(stream_->id(), 100u, 0u, NO_FIN,
1282 NOT_RETRANSMISSION, std::nullopt);
1283 }));
1284 // There is no buffered data before, all data should be consumed.
1285 QuicConsumedData consumed = stream_->WriteMemSlices(span1, false);
1286 EXPECT_EQ(2048u, consumed.bytes_consumed);
1287 EXPECT_FALSE(consumed.fin_consumed);
1288 EXPECT_EQ(2 * kDataSize - 100, stream_->BufferedDataBytes());
1289 EXPECT_FALSE(stream_->fin_buffered());
1290
1291 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
1292 // No Data can be consumed as buffered data is beyond upper limit.
1293 consumed = stream_->WriteMemSlices(span2, true);
1294 EXPECT_EQ(0u, consumed.bytes_consumed);
1295 EXPECT_FALSE(consumed.fin_consumed);
1296 EXPECT_EQ(2 * kDataSize - 100, stream_->BufferedDataBytes());
1297 EXPECT_FALSE(stream_->fin_buffered());
1298
1299 QuicByteCount data_to_write =
1300 2 * kDataSize - 100 - GetQuicFlag(quic_buffered_data_threshold) + 1;
1301 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1302 .WillOnce(InvokeWithoutArgs([this, data_to_write]() {
1303 return session_->ConsumeData(stream_->id(), data_to_write, 100u, NO_FIN,
1304 NOT_RETRANSMISSION, std::nullopt);
1305 }));
1306 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(1);
1307 stream_->OnCanWrite();
1308 EXPECT_EQ(
1309 static_cast<uint64_t>(GetQuicFlag(quic_buffered_data_threshold) - 1),
1310 stream_->BufferedDataBytes());
1311 // Try to write slices2 again.
1312 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
1313 consumed = stream_->WriteMemSlices(span2, true);
1314 EXPECT_EQ(2048u, consumed.bytes_consumed);
1315 EXPECT_TRUE(consumed.fin_consumed);
1316 EXPECT_EQ(2 * kDataSize + GetQuicFlag(quic_buffered_data_threshold) - 1,
1317 stream_->BufferedDataBytes());
1318 EXPECT_TRUE(stream_->fin_buffered());
1319
1320 // Flush all buffered data.
1321 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1322 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1323 stream_->OnCanWrite();
1324 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(0);
1325 EXPECT_FALSE(stream_->HasBufferedData());
1326 EXPECT_TRUE(stream_->write_side_closed());
1327 }
1328
TEST_P(QuicStreamTest,WriteMemSlicesReachStreamLimit)1329 TEST_P(QuicStreamTest, WriteMemSlicesReachStreamLimit) {
1330 Initialize();
1331 QuicStreamPeer::SetStreamBytesWritten(kMaxStreamLength - 5u, stream_);
1332 std::vector<std::pair<char*, size_t>> buffers;
1333 quiche::QuicheMemSlice slice1 = MemSliceFromString("12345");
1334 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1335 .WillOnce(InvokeWithoutArgs([this]() {
1336 return session_->ConsumeData(stream_->id(), 5u, 0u, NO_FIN,
1337 NOT_RETRANSMISSION, std::nullopt);
1338 }));
1339 // There is no buffered data before, all data should be consumed.
1340 QuicConsumedData consumed = stream_->WriteMemSlice(std::move(slice1), false);
1341 EXPECT_EQ(5u, consumed.bytes_consumed);
1342
1343 quiche::QuicheMemSlice slice2 = MemSliceFromString("6");
1344 EXPECT_QUIC_BUG(
1345 {
1346 EXPECT_CALL(*connection_,
1347 CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _));
1348 stream_->WriteMemSlice(std::move(slice2), false);
1349 },
1350 "Write too many data via stream");
1351 }
1352
TEST_P(QuicStreamTest,StreamDataGetAckedMultipleTimes)1353 TEST_P(QuicStreamTest, StreamDataGetAckedMultipleTimes) {
1354 Initialize();
1355 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1356 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1357 EXPECT_FALSE(stream_->IsWaitingForAcks());
1358 EXPECT_FALSE(session_->HasUnackedStreamData());
1359
1360 // Send [0, 27) and fin.
1361 stream_->WriteOrBufferData(kData1, false, nullptr);
1362 stream_->WriteOrBufferData(kData1, false, nullptr);
1363 stream_->WriteOrBufferData(kData1, true, nullptr);
1364 EXPECT_EQ(3u, QuicStreamPeer::SendBuffer(stream_).size());
1365 EXPECT_TRUE(stream_->IsWaitingForAcks());
1366 EXPECT_TRUE(session_->HasUnackedStreamData());
1367 // Ack [0, 9), [5, 22) and [18, 26)
1368 // Verify [0, 9) 9 bytes are acked.
1369 QuicByteCount newly_acked_length = 0;
1370 EXPECT_TRUE(stream_->OnStreamFrameAcked(0, 9, false, QuicTime::Delta::Zero(),
1371 QuicTime::Zero(),
1372 &newly_acked_length));
1373 EXPECT_EQ(9u, newly_acked_length);
1374 EXPECT_EQ(2u, QuicStreamPeer::SendBuffer(stream_).size());
1375 // Verify [9, 22) 13 bytes are acked.
1376 EXPECT_TRUE(stream_->OnStreamFrameAcked(5, 17, false, QuicTime::Delta::Zero(),
1377 QuicTime::Zero(),
1378 &newly_acked_length));
1379 EXPECT_EQ(13u, newly_acked_length);
1380 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1381 // Verify [22, 26) 4 bytes are acked.
1382 EXPECT_TRUE(stream_->OnStreamFrameAcked(18, 8, false, QuicTime::Delta::Zero(),
1383 QuicTime::Zero(),
1384 &newly_acked_length));
1385 EXPECT_EQ(4u, newly_acked_length);
1386 EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
1387 EXPECT_TRUE(stream_->IsWaitingForAcks());
1388 EXPECT_TRUE(session_->HasUnackedStreamData());
1389
1390 // Ack [0, 27). Verify [26, 27) 1 byte is acked.
1391 EXPECT_TRUE(stream_->OnStreamFrameAcked(26, 1, false, QuicTime::Delta::Zero(),
1392 QuicTime::Zero(),
1393 &newly_acked_length));
1394 EXPECT_EQ(1u, newly_acked_length);
1395 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1396 EXPECT_TRUE(stream_->IsWaitingForAcks());
1397 EXPECT_TRUE(session_->HasUnackedStreamData());
1398
1399 // Ack Fin.
1400 EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
1401 EXPECT_TRUE(stream_->OnStreamFrameAcked(27, 0, true, QuicTime::Delta::Zero(),
1402 QuicTime::Zero(),
1403 &newly_acked_length));
1404 EXPECT_EQ(0u, newly_acked_length);
1405 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1406 EXPECT_FALSE(stream_->IsWaitingForAcks());
1407 EXPECT_FALSE(session_->HasUnackedStreamData());
1408
1409 // Ack [10, 27) and fin. No new data is acked.
1410 EXPECT_FALSE(
1411 stream_->OnStreamFrameAcked(10, 17, true, QuicTime::Delta::Zero(),
1412 QuicTime::Zero(), &newly_acked_length));
1413 EXPECT_EQ(0u, newly_acked_length);
1414 EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
1415 EXPECT_FALSE(stream_->IsWaitingForAcks());
1416 EXPECT_FALSE(session_->HasUnackedStreamData());
1417 }
1418
TEST_P(QuicStreamTest,OnStreamFrameLost)1419 TEST_P(QuicStreamTest, OnStreamFrameLost) {
1420 Initialize();
1421
1422 // Send [0, 9).
1423 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1424 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1425 stream_->WriteOrBufferData(kData1, false, nullptr);
1426 EXPECT_FALSE(stream_->HasBufferedData());
1427 EXPECT_TRUE(stream_->IsStreamFrameOutstanding(0, 9, false));
1428
1429 // Try to send [9, 27), but connection is blocked.
1430 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1431 .WillOnce(Return(QuicConsumedData(0, false)));
1432 stream_->WriteOrBufferData(kData2, false, nullptr);
1433 stream_->WriteOrBufferData(kData2, false, nullptr);
1434 EXPECT_TRUE(stream_->HasBufferedData());
1435 EXPECT_FALSE(stream_->HasPendingRetransmission());
1436
1437 // Lost [0, 9). When stream gets a chance to write, only lost data is
1438 // transmitted.
1439 stream_->OnStreamFrameLost(0, 9, false);
1440 EXPECT_TRUE(stream_->HasPendingRetransmission());
1441 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1442 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1443 EXPECT_CALL(*stream_, OnCanWriteNewData()).Times(1);
1444 stream_->OnCanWrite();
1445 EXPECT_FALSE(stream_->HasPendingRetransmission());
1446 EXPECT_TRUE(stream_->HasBufferedData());
1447
1448 // This OnCanWrite causes [9, 27) to be sent.
1449 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1450 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1451 stream_->OnCanWrite();
1452 EXPECT_FALSE(stream_->HasBufferedData());
1453
1454 // Send a fin only frame.
1455 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1456 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1457 stream_->WriteOrBufferData("", true, nullptr);
1458
1459 // Lost [9, 27) and fin.
1460 stream_->OnStreamFrameLost(9, 18, false);
1461 stream_->OnStreamFrameLost(27, 0, true);
1462 EXPECT_TRUE(stream_->HasPendingRetransmission());
1463
1464 // Ack [9, 18).
1465 QuicByteCount newly_acked_length = 0;
1466 EXPECT_TRUE(stream_->OnStreamFrameAcked(9, 9, false, QuicTime::Delta::Zero(),
1467 QuicTime::Zero(),
1468 &newly_acked_length));
1469 EXPECT_EQ(9u, newly_acked_length);
1470 EXPECT_FALSE(stream_->IsStreamFrameOutstanding(9, 3, false));
1471 EXPECT_TRUE(stream_->HasPendingRetransmission());
1472 // This OnCanWrite causes [18, 27) and fin to be retransmitted. Verify fin can
1473 // be bundled with data.
1474 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1475 .WillOnce(InvokeWithoutArgs([this]() {
1476 return session_->ConsumeData(stream_->id(), 9u, 18u, FIN,
1477 NOT_RETRANSMISSION, std::nullopt);
1478 }));
1479 stream_->OnCanWrite();
1480 EXPECT_FALSE(stream_->HasPendingRetransmission());
1481 // Lost [9, 18) again, but it is not considered as lost because kData2
1482 // has been acked.
1483 stream_->OnStreamFrameLost(9, 9, false);
1484 EXPECT_FALSE(stream_->HasPendingRetransmission());
1485 EXPECT_TRUE(stream_->IsStreamFrameOutstanding(27, 0, true));
1486 }
1487
TEST_P(QuicStreamTest,CannotBundleLostFin)1488 TEST_P(QuicStreamTest, CannotBundleLostFin) {
1489 Initialize();
1490
1491 // Send [0, 18) and fin.
1492 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1493 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1494 stream_->WriteOrBufferData(kData1, false, nullptr);
1495 stream_->WriteOrBufferData(kData2, true, nullptr);
1496
1497 // Lost [0, 9) and fin.
1498 stream_->OnStreamFrameLost(0, 9, false);
1499 stream_->OnStreamFrameLost(18, 0, true);
1500
1501 // Retransmit lost data. Verify [0, 9) and fin are retransmitted in two
1502 // frames.
1503 InSequence s;
1504 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1505 .WillOnce(InvokeWithoutArgs([this]() {
1506 return session_->ConsumeData(stream_->id(), 9u, 0u, NO_FIN,
1507 NOT_RETRANSMISSION, std::nullopt);
1508 }));
1509 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1510 .WillOnce(Return(QuicConsumedData(0, true)));
1511 stream_->OnCanWrite();
1512 }
1513
TEST_P(QuicStreamTest,MarkConnectionLevelWriteBlockedOnWindowUpdateFrame)1514 TEST_P(QuicStreamTest, MarkConnectionLevelWriteBlockedOnWindowUpdateFrame) {
1515 Initialize();
1516
1517 // Set the config to a small value so that a newly created stream has small
1518 // send flow control window.
1519 QuicConfigPeer::SetReceivedInitialStreamFlowControlWindow(session_->config(),
1520 100);
1521 QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesIncomingBidirectional(
1522 session_->config(), 100);
1523 auto stream = new TestStream(GetNthClientInitiatedBidirectionalStreamId(
1524 GetParam().transport_version, 2),
1525 session_.get(), BIDIRECTIONAL);
1526 session_->ActivateStream(absl::WrapUnique(stream));
1527
1528 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1529 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1530 EXPECT_CALL(*session_, SendBlocked(_, _)).Times(1);
1531 std::string data(1024, '.');
1532 stream->WriteOrBufferData(data, false, nullptr);
1533 EXPECT_FALSE(HasWriteBlockedStreams());
1534
1535 QuicWindowUpdateFrame window_update(kInvalidControlFrameId, stream_->id(),
1536 1234);
1537
1538 stream->OnWindowUpdateFrame(window_update);
1539 // Verify stream is marked connection level write blocked.
1540 EXPECT_TRUE(HasWriteBlockedStreams());
1541 EXPECT_TRUE(stream->HasBufferedData());
1542 }
1543
1544 // Regression test for b/73282665.
TEST_P(QuicStreamTest,MarkConnectionLevelWriteBlockedOnWindowUpdateFrameWithNoBufferedData)1545 TEST_P(QuicStreamTest,
1546 MarkConnectionLevelWriteBlockedOnWindowUpdateFrameWithNoBufferedData) {
1547 Initialize();
1548
1549 // Set the config to a small value so that a newly created stream has small
1550 // send flow control window.
1551 QuicConfigPeer::SetReceivedInitialStreamFlowControlWindow(session_->config(),
1552 100);
1553 QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesIncomingBidirectional(
1554 session_->config(), 100);
1555 auto stream = new TestStream(GetNthClientInitiatedBidirectionalStreamId(
1556 GetParam().transport_version, 2),
1557 session_.get(), BIDIRECTIONAL);
1558 session_->ActivateStream(absl::WrapUnique(stream));
1559
1560 std::string data(100, '.');
1561 EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
1562 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1563 EXPECT_CALL(*session_, SendBlocked(_, _)).Times(1);
1564 stream->WriteOrBufferData(data, false, nullptr);
1565 EXPECT_FALSE(HasWriteBlockedStreams());
1566
1567 QuicWindowUpdateFrame window_update(kInvalidControlFrameId, stream_->id(),
1568 120);
1569 stream->OnWindowUpdateFrame(window_update);
1570 EXPECT_FALSE(stream->HasBufferedData());
1571 // Verify stream is marked as blocked although there is no buffered data.
1572 EXPECT_TRUE(HasWriteBlockedStreams());
1573 }
1574
TEST_P(QuicStreamTest,RetransmitStreamData)1575 TEST_P(QuicStreamTest, RetransmitStreamData) {
1576 Initialize();
1577 InSequence s;
1578
1579 // Send [0, 18) with fin.
1580 EXPECT_CALL(*session_, WritevData(stream_->id(), _, _, _, _, _))
1581 .Times(2)
1582 .WillRepeatedly(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1583 stream_->WriteOrBufferData(kData1, false, nullptr);
1584 stream_->WriteOrBufferData(kData1, true, nullptr);
1585 // Ack [10, 13).
1586 QuicByteCount newly_acked_length = 0;
1587 stream_->OnStreamFrameAcked(10, 3, false, QuicTime::Delta::Zero(),
1588 QuicTime::Zero(), &newly_acked_length);
1589 EXPECT_EQ(3u, newly_acked_length);
1590 // Retransmit [0, 18) with fin, and only [0, 8) is consumed.
1591 EXPECT_CALL(*session_, WritevData(stream_->id(), 10, 0, NO_FIN, _, _))
1592 .WillOnce(InvokeWithoutArgs([this]() {
1593 return session_->ConsumeData(stream_->id(), 8, 0u, NO_FIN,
1594 NOT_RETRANSMISSION, std::nullopt);
1595 }));
1596 EXPECT_FALSE(stream_->RetransmitStreamData(0, 18, true, PTO_RETRANSMISSION));
1597
1598 // Retransmit [0, 18) with fin, and all is consumed.
1599 EXPECT_CALL(*session_, WritevData(stream_->id(), 10, 0, NO_FIN, _, _))
1600 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1601 EXPECT_CALL(*session_, WritevData(stream_->id(), 5, 13, FIN, _, _))
1602 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1603 EXPECT_TRUE(stream_->RetransmitStreamData(0, 18, true, PTO_RETRANSMISSION));
1604
1605 // Retransmit [0, 8) with fin, and all is consumed.
1606 EXPECT_CALL(*session_, WritevData(stream_->id(), 8, 0, NO_FIN, _, _))
1607 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1608 EXPECT_CALL(*session_, WritevData(stream_->id(), 0, 18, FIN, _, _))
1609 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1610 EXPECT_TRUE(stream_->RetransmitStreamData(0, 8, true, PTO_RETRANSMISSION));
1611 }
1612
TEST_P(QuicStreamTest,ResetStreamOnTtlExpiresRetransmitLostData)1613 TEST_P(QuicStreamTest, ResetStreamOnTtlExpiresRetransmitLostData) {
1614 Initialize();
1615
1616 EXPECT_CALL(*session_, WritevData(stream_->id(), 200, 0, FIN, _, _))
1617 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1618 std::string body(200, 'a');
1619 stream_->WriteOrBufferData(body, true, nullptr);
1620
1621 // Set TTL to be 1 s.
1622 QuicTime::Delta ttl = QuicTime::Delta::FromSeconds(1);
1623 ASSERT_TRUE(stream_->MaybeSetTtl(ttl));
1624 // Verify data gets retransmitted because TTL does not expire.
1625 EXPECT_CALL(*session_, WritevData(stream_->id(), 100, 0, NO_FIN, _, _))
1626 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1627 EXPECT_TRUE(stream_->RetransmitStreamData(0, 100, false, PTO_RETRANSMISSION));
1628 stream_->OnStreamFrameLost(100, 100, true);
1629 EXPECT_TRUE(stream_->HasPendingRetransmission());
1630
1631 connection_->AdvanceTime(QuicTime::Delta::FromSeconds(1));
1632 // Verify stream gets reset because TTL expires.
1633 if (session_->version().UsesHttp3()) {
1634 EXPECT_CALL(*session_,
1635 MaybeSendStopSendingFrame(_, QuicResetStreamError::FromInternal(
1636 QUIC_STREAM_TTL_EXPIRED)))
1637 .Times(1);
1638 }
1639 EXPECT_CALL(
1640 *session_,
1641 MaybeSendRstStreamFrame(
1642 _, QuicResetStreamError::FromInternal(QUIC_STREAM_TTL_EXPIRED), _))
1643 .Times(1);
1644 stream_->OnCanWrite();
1645 }
1646
TEST_P(QuicStreamTest,ResetStreamOnTtlExpiresEarlyRetransmitData)1647 TEST_P(QuicStreamTest, ResetStreamOnTtlExpiresEarlyRetransmitData) {
1648 Initialize();
1649
1650 EXPECT_CALL(*session_, WritevData(stream_->id(), 200, 0, FIN, _, _))
1651 .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
1652 std::string body(200, 'a');
1653 stream_->WriteOrBufferData(body, true, nullptr);
1654
1655 // Set TTL to be 1 s.
1656 QuicTime::Delta ttl = QuicTime::Delta::FromSeconds(1);
1657 ASSERT_TRUE(stream_->MaybeSetTtl(ttl));
1658
1659 connection_->AdvanceTime(QuicTime::Delta::FromSeconds(1));
1660 // Verify stream gets reset because TTL expires.
1661 if (session_->version().UsesHttp3()) {
1662 EXPECT_CALL(*session_,
1663 MaybeSendStopSendingFrame(_, QuicResetStreamError::FromInternal(
1664 QUIC_STREAM_TTL_EXPIRED)))
1665 .Times(1);
1666 }
1667 EXPECT_CALL(
1668 *session_,
1669 MaybeSendRstStreamFrame(
1670 _, QuicResetStreamError::FromInternal(QUIC_STREAM_TTL_EXPIRED), _))
1671 .Times(1);
1672 stream_->RetransmitStreamData(0, 100, false, PTO_RETRANSMISSION);
1673 }
1674
1675 // Test that OnStreamReset does one-way (read) closes if version 99, two way
1676 // (read and write) if not version 99.
TEST_P(QuicStreamTest,OnStreamResetReadOrReadWrite)1677 TEST_P(QuicStreamTest, OnStreamResetReadOrReadWrite) {
1678 Initialize();
1679 EXPECT_FALSE(stream_->write_side_closed());
1680 EXPECT_FALSE(QuicStreamPeer::read_side_closed(stream_));
1681
1682 QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
1683 QUIC_STREAM_CANCELLED, 1234);
1684 stream_->OnStreamReset(rst_frame);
1685 if (VersionHasIetfQuicFrames(connection_->transport_version())) {
1686 // Version 99/IETF QUIC should close just the read side.
1687 EXPECT_TRUE(QuicStreamPeer::read_side_closed(stream_));
1688 EXPECT_FALSE(stream_->write_side_closed());
1689 } else {
1690 // Google QUIC should close both sides of the stream.
1691 EXPECT_TRUE(stream_->write_side_closed());
1692 EXPECT_TRUE(QuicStreamPeer::read_side_closed(stream_));
1693 }
1694 }
1695
TEST_P(QuicStreamTest,WindowUpdateForReadOnlyStream)1696 TEST_P(QuicStreamTest, WindowUpdateForReadOnlyStream) {
1697 Initialize();
1698
1699 QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
1700 connection_->transport_version(), Perspective::IS_CLIENT);
1701 TestStream stream(stream_id, session_.get(), READ_UNIDIRECTIONAL);
1702 QuicWindowUpdateFrame window_update_frame(kInvalidControlFrameId, stream_id,
1703 0);
1704 EXPECT_CALL(
1705 *connection_,
1706 CloseConnection(
1707 QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
1708 "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.", _));
1709 stream.OnWindowUpdateFrame(window_update_frame);
1710 }
1711
TEST_P(QuicStreamTest,RstStreamFrameChangesCloseOffset)1712 TEST_P(QuicStreamTest, RstStreamFrameChangesCloseOffset) {
1713 Initialize();
1714
1715 QuicStreamFrame stream_frame(stream_->id(), true, 0, "abc");
1716 EXPECT_CALL(*stream_, OnDataAvailable());
1717 stream_->OnStreamFrame(stream_frame);
1718 QuicRstStreamFrame rst(kInvalidControlFrameId, stream_->id(),
1719 QUIC_STREAM_CANCELLED, 0u);
1720
1721 EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_MULTIPLE_OFFSET, _, _));
1722 stream_->OnStreamReset(rst);
1723 }
1724
1725 // Regression test for b/176073284.
TEST_P(QuicStreamTest,EmptyStreamFrameWithNoFin)1726 TEST_P(QuicStreamTest, EmptyStreamFrameWithNoFin) {
1727 Initialize();
1728 QuicStreamFrame empty_stream_frame(stream_->id(), false, 0, "");
1729 if (stream_->version().HasIetfQuicFrames()) {
1730 EXPECT_CALL(*connection_,
1731 CloseConnection(QUIC_EMPTY_STREAM_FRAME_NO_FIN, _, _))
1732 .Times(0);
1733 } else {
1734 EXPECT_CALL(*connection_,
1735 CloseConnection(QUIC_EMPTY_STREAM_FRAME_NO_FIN, _, _));
1736 }
1737 EXPECT_CALL(*stream_, OnDataAvailable()).Times(0);
1738 stream_->OnStreamFrame(empty_stream_frame);
1739 }
1740
TEST_P(QuicStreamTest,SendRstWithCustomIetfCode)1741 TEST_P(QuicStreamTest, SendRstWithCustomIetfCode) {
1742 Initialize();
1743 QuicResetStreamError error(QUIC_STREAM_CANCELLED, 0x1234abcd);
1744 EXPECT_CALL(*session_, MaybeSendRstStreamFrame(kTestStreamId, error, _))
1745 .Times(1);
1746 stream_->ResetWithError(error);
1747 EXPECT_TRUE(rst_sent());
1748 }
1749
1750 } // namespace
1751 } // namespace test
1752 } // namespace quic
1753