xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/core/quic_generic_session_test.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright (c) 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // An integration test that covers interactions between QuicGenericSession
6 // client and server sessions.
7 
8 #include "quiche/quic/core/quic_generic_session.h"
9 
10 #include <cstddef>
11 #include <cstring>
12 #include <memory>
13 #include <optional>
14 #include <string>
15 #include <vector>
16 
17 #include "absl/strings/string_view.h"
18 #include "quiche/quic/core/crypto/quic_compressed_certs_cache.h"
19 #include "quiche/quic/core/crypto/quic_crypto_client_config.h"
20 #include "quiche/quic/core/crypto/quic_crypto_server_config.h"
21 #include "quiche/quic/core/crypto/quic_random.h"
22 #include "quiche/quic/core/quic_connection.h"
23 #include "quiche/quic/core/quic_constants.h"
24 #include "quiche/quic/core/quic_datagram_queue.h"
25 #include "quiche/quic/core/quic_error_codes.h"
26 #include "quiche/quic/core/quic_stream.h"
27 #include "quiche/quic/core/quic_types.h"
28 #include "quiche/quic/core/web_transport_interface.h"
29 #include "quiche/quic/platform/api/quic_test.h"
30 #include "quiche/quic/test_tools/crypto_test_utils.h"
31 #include "quiche/quic/test_tools/quic_session_peer.h"
32 #include "quiche/quic/test_tools/quic_test_utils.h"
33 #include "quiche/quic/test_tools/simulator/simulator.h"
34 #include "quiche/quic/test_tools/simulator/test_harness.h"
35 #include "quiche/quic/test_tools/web_transport_test_tools.h"
36 #include "quiche/quic/tools/web_transport_test_visitors.h"
37 #include "quiche/common/quiche_stream.h"
38 #include "quiche/common/test_tools/quiche_test_utils.h"
39 #include "quiche/web_transport/web_transport.h"
40 
41 namespace quic::test {
42 namespace {
43 
44 enum ServerType { kDiscardServer, kEchoServer };
45 
46 using quiche::test::StatusIs;
47 using simulator::Simulator;
48 using testing::_;
49 using testing::Assign;
50 using testing::AtMost;
51 using testing::Eq;
52 
53 class CountingDatagramObserver : public QuicDatagramQueue::Observer {
54  public:
CountingDatagramObserver(int & total)55   CountingDatagramObserver(int& total) : total_(total) {}
OnDatagramProcessed(std::optional<MessageStatus>)56   void OnDatagramProcessed(std::optional<MessageStatus>) { ++total_; }
57 
58  private:
59   int& total_;
60 };
61 
62 class ClientEndpoint : public simulator::QuicEndpointWithConnection {
63  public:
ClientEndpoint(Simulator * simulator,const std::string & name,const std::string & peer_name,const QuicConfig & config)64   ClientEndpoint(Simulator* simulator, const std::string& name,
65                  const std::string& peer_name, const QuicConfig& config)
66       : QuicEndpointWithConnection(simulator, name, peer_name,
67                                    Perspective::IS_CLIENT,
68                                    GetQuicVersionsForGenericSession()),
69         crypto_config_(crypto_test_utils::ProofVerifierForTesting()),
70         session_(connection_.get(), false, nullptr, config, "test.example.com",
71                  443, "example_alpn", &visitor_, /*visitor_owned=*/false,
72                  std::make_unique<CountingDatagramObserver>(
73                      total_datagrams_processed_),
74                  &crypto_config_) {
75     session_.Initialize();
76     session_.connection()->sent_packet_manager().SetSendAlgorithm(
77         CongestionControlType::kBBRv2);
78     EXPECT_CALL(visitor_, OnSessionReady())
79         .Times(AtMost(1))
80         .WillOnce(Assign(&session_ready_, true));
81   }
82 
session()83   QuicGenericClientSession* session() { return &session_; }
visitor()84   MockWebTransportSessionVisitor* visitor() { return &visitor_; }
85 
session_ready() const86   bool session_ready() const { return session_ready_; }
total_datagrams_processed() const87   int total_datagrams_processed() const { return total_datagrams_processed_; }
88 
89  private:
90   QuicCryptoClientConfig crypto_config_;
91   MockWebTransportSessionVisitor visitor_;
92   QuicGenericClientSession session_;
93   bool session_ready_ = false;
94   int total_datagrams_processed_ = 0;
95 };
96 
97 class ServerEndpoint : public simulator::QuicEndpointWithConnection {
98  public:
ServerEndpoint(Simulator * simulator,const std::string & name,const std::string & peer_name,const QuicConfig & config,ServerType type)99   ServerEndpoint(Simulator* simulator, const std::string& name,
100                  const std::string& peer_name, const QuicConfig& config,
101                  ServerType type)
102       : QuicEndpointWithConnection(simulator, name, peer_name,
103                                    Perspective::IS_SERVER,
104                                    GetQuicVersionsForGenericSession()),
105         crypto_config_(QuicCryptoServerConfig::TESTING,
106                        QuicRandom::GetInstance(),
107                        crypto_test_utils::ProofSourceForTesting(),
108                        KeyExchangeSource::Default()),
109         compressed_certs_cache_(
110             QuicCompressedCertsCache::kQuicCompressedCertsCacheSize),
111         session_(connection_.get(), false, nullptr, config, "example_alpn",
112                  type == kEchoServer
113                      ? static_cast<webtransport::SessionVisitor*>(
114                            new EchoWebTransportSessionVisitor(
115                                &session_,
116                                /*open_server_initiated_echo_stream=*/false))
117                      : static_cast<webtransport::SessionVisitor*>(
118                            new DiscardWebTransportSessionVisitor(&session_)),
119                  /*owns_visitor=*/true,
120                  /*datagram_observer=*/nullptr, &crypto_config_,
121                  &compressed_certs_cache_) {
122     session_.Initialize();
123     session_.connection()->sent_packet_manager().SetSendAlgorithm(
124         CongestionControlType::kBBRv2);
125   }
126 
session()127   QuicGenericServerSession* session() { return &session_; }
128 
129  private:
130   QuicCryptoServerConfig crypto_config_;
131   QuicCompressedCertsCache compressed_certs_cache_;
132   QuicGenericServerSession session_;
133 };
134 
135 class QuicGenericSessionTest : public QuicTest {
136  public:
CreateDefaultEndpoints(ServerType server_type)137   void CreateDefaultEndpoints(ServerType server_type) {
138     client_ = std::make_unique<ClientEndpoint>(
139         &test_harness_.simulator(), "Client", "Server", client_config_);
140     server_ =
141         std::make_unique<ServerEndpoint>(&test_harness_.simulator(), "Server",
142                                          "Client", server_config_, server_type);
143     test_harness_.set_client(client_.get());
144     test_harness_.set_server(server_.get());
145   }
146 
WireUpEndpoints()147   void WireUpEndpoints() { test_harness_.WireUpEndpoints(); }
148 
RunHandshake()149   void RunHandshake() {
150     client_->session()->CryptoConnect();
151     bool result = test_harness_.RunUntilWithDefaultTimeout([this]() {
152       return client_->session_ready() ||
153              client_->session()->error() != QUIC_NO_ERROR;
154     });
155     EXPECT_TRUE(result);
156   }
157 
158  protected:
159   QuicConfig client_config_ = DefaultQuicConfig();
160   QuicConfig server_config_ = DefaultQuicConfig();
161 
162   simulator::TestHarness test_harness_;
163 
164   std::unique_ptr<ClientEndpoint> client_;
165   std::unique_ptr<ServerEndpoint> server_;
166 };
167 
TEST_F(QuicGenericSessionTest,SuccessfulHandshake)168 TEST_F(QuicGenericSessionTest, SuccessfulHandshake) {
169   CreateDefaultEndpoints(kDiscardServer);
170   WireUpEndpoints();
171   RunHandshake();
172   EXPECT_TRUE(client_->session_ready());
173 }
174 
TEST_F(QuicGenericSessionTest,SendOutgoingStreams)175 TEST_F(QuicGenericSessionTest, SendOutgoingStreams) {
176   CreateDefaultEndpoints(kDiscardServer);
177   WireUpEndpoints();
178   RunHandshake();
179 
180   std::vector<webtransport::Stream*> streams;
181   for (int i = 0; i < 10; i++) {
182     webtransport::Stream* stream =
183         client_->session()->OpenOutgoingUnidirectionalStream();
184     ASSERT_TRUE(stream->Write("test"));
185     streams.push_back(stream);
186   }
187   ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout([this]() {
188     return QuicSessionPeer::GetNumOpenDynamicStreams(server_->session()) == 10;
189   }));
190 
191   for (webtransport::Stream* stream : streams) {
192     ASSERT_TRUE(stream->SendFin());
193   }
194   ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout([this]() {
195     return QuicSessionPeer::GetNumOpenDynamicStreams(server_->session()) == 0;
196   }));
197 }
198 
TEST_F(QuicGenericSessionTest,EchoBidirectionalStreams)199 TEST_F(QuicGenericSessionTest, EchoBidirectionalStreams) {
200   CreateDefaultEndpoints(kEchoServer);
201   WireUpEndpoints();
202   RunHandshake();
203 
204   webtransport::Stream* stream =
205       client_->session()->OpenOutgoingBidirectionalStream();
206   EXPECT_TRUE(stream->Write("Hello!"));
207 
208   ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
209       [stream]() { return stream->ReadableBytes() == strlen("Hello!"); }));
210   std::string received;
211   WebTransportStream::ReadResult result = stream->Read(&received);
212   EXPECT_EQ(result.bytes_read, strlen("Hello!"));
213   EXPECT_FALSE(result.fin);
214   EXPECT_EQ(received, "Hello!");
215 
216   EXPECT_TRUE(stream->SendFin());
217   ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout([this]() {
218     return QuicSessionPeer::GetNumOpenDynamicStreams(server_->session()) == 0;
219   }));
220 }
221 
TEST_F(QuicGenericSessionTest,EchoUnidirectionalStreams)222 TEST_F(QuicGenericSessionTest, EchoUnidirectionalStreams) {
223   CreateDefaultEndpoints(kEchoServer);
224   WireUpEndpoints();
225   RunHandshake();
226 
227   // Send two streams, but only send FIN on the second one.
228   webtransport::Stream* stream1 =
229       client_->session()->OpenOutgoingUnidirectionalStream();
230   EXPECT_TRUE(stream1->Write("Stream One"));
231   webtransport::Stream* stream2 =
232       client_->session()->OpenOutgoingUnidirectionalStream();
233   EXPECT_TRUE(stream2->Write("Stream Two"));
234   EXPECT_TRUE(stream2->SendFin());
235 
236   // Wait until a stream is received.
237   bool stream_received = false;
238   EXPECT_CALL(*client_->visitor(), OnIncomingUnidirectionalStreamAvailable())
239       .Times(2)
240       .WillRepeatedly(Assign(&stream_received, true));
241   ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
242       [&stream_received]() { return stream_received; }));
243 
244   // Receive a reply stream and expect it to be the second one.
245   webtransport::Stream* reply =
246       client_->session()->AcceptIncomingUnidirectionalStream();
247   ASSERT_TRUE(reply != nullptr);
248   std::string buffer;
249   WebTransportStream::ReadResult result = reply->Read(&buffer);
250   EXPECT_GT(result.bytes_read, 0u);
251   EXPECT_TRUE(result.fin);
252   EXPECT_EQ(buffer, "Stream Two");
253 
254   // Reset reply-related variables.
255   stream_received = false;
256   buffer = "";
257 
258   // Send FIN on the first stream, and expect to receive it back.
259   EXPECT_TRUE(stream1->SendFin());
260   ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
261       [&stream_received]() { return stream_received; }));
262   reply = client_->session()->AcceptIncomingUnidirectionalStream();
263   ASSERT_TRUE(reply != nullptr);
264   result = reply->Read(&buffer);
265   EXPECT_GT(result.bytes_read, 0u);
266   EXPECT_TRUE(result.fin);
267   EXPECT_EQ(buffer, "Stream One");
268 }
269 
TEST_F(QuicGenericSessionTest,EchoStreamsUsingPeekApi)270 TEST_F(QuicGenericSessionTest, EchoStreamsUsingPeekApi) {
271   CreateDefaultEndpoints(kEchoServer);
272   WireUpEndpoints();
273   RunHandshake();
274 
275   // Send two streams, a bidirectional and a unidirectional one, but only send
276   // FIN on the second one.
277   webtransport::Stream* stream1 =
278       client_->session()->OpenOutgoingBidirectionalStream();
279   EXPECT_TRUE(stream1->Write("Stream One"));
280   webtransport::Stream* stream2 =
281       client_->session()->OpenOutgoingUnidirectionalStream();
282   EXPECT_TRUE(stream2->Write("Stream Two"));
283   EXPECT_TRUE(stream2->SendFin());
284 
285   // Wait until the unidirectional stream is received back.
286   bool stream_received_unidi = false;
287   EXPECT_CALL(*client_->visitor(), OnIncomingUnidirectionalStreamAvailable())
288       .WillOnce(Assign(&stream_received_unidi, true));
289   ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
290       [&]() { return stream_received_unidi; }));
291 
292   // Receive the unidirectional echo reply.
293   webtransport::Stream* reply =
294       client_->session()->AcceptIncomingUnidirectionalStream();
295   ASSERT_TRUE(reply != nullptr);
296   std::string buffer;
297   quiche::ReadStream::PeekResult peek_result = reply->PeekNextReadableRegion();
298   EXPECT_EQ(peek_result.peeked_data, "Stream Two");
299   EXPECT_EQ(peek_result.fin_next, false);
300   EXPECT_EQ(peek_result.all_data_received, true);
301   bool fin_received =
302       quiche::ProcessAllReadableRegions(*reply, [&](absl::string_view chunk) {
303         buffer.append(chunk.data(), chunk.size());
304         return true;
305       });
306   EXPECT_TRUE(fin_received);
307   EXPECT_EQ(buffer, "Stream Two");
308 
309   // Receive the bidirectional stream reply without a FIN.
310   ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
311       [&]() { return stream1->PeekNextReadableRegion().has_data(); }));
312   peek_result = stream1->PeekNextReadableRegion();
313   EXPECT_EQ(peek_result.peeked_data, "Stream One");
314   EXPECT_EQ(peek_result.fin_next, false);
315   EXPECT_EQ(peek_result.all_data_received, false);
316   fin_received = stream1->SkipBytes(strlen("Stream One"));
317   EXPECT_FALSE(fin_received);
318   peek_result = stream1->PeekNextReadableRegion();
319   EXPECT_EQ(peek_result.peeked_data, "");
320   EXPECT_EQ(peek_result.fin_next, false);
321   EXPECT_EQ(peek_result.all_data_received, false);
322 
323   // Send FIN on the first stream, and expect to receive it back.
324   EXPECT_TRUE(stream1->SendFin());
325   ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
326       [&]() { return stream1->PeekNextReadableRegion().all_data_received; }));
327   peek_result = stream1->PeekNextReadableRegion();
328   EXPECT_EQ(peek_result.peeked_data, "");
329   EXPECT_EQ(peek_result.fin_next, true);
330   EXPECT_EQ(peek_result.all_data_received, true);
331 
332   // Read FIN and expect the stream to get garbage collected.
333   webtransport::StreamId id = stream1->GetStreamId();
334   EXPECT_TRUE(client_->session()->GetStreamById(id) != nullptr);
335   fin_received = stream1->SkipBytes(0);
336   EXPECT_TRUE(fin_received);
337   EXPECT_TRUE(client_->session()->GetStreamById(id) == nullptr);
338 }
339 
TEST_F(QuicGenericSessionTest,EchoDatagram)340 TEST_F(QuicGenericSessionTest, EchoDatagram) {
341   CreateDefaultEndpoints(kEchoServer);
342   WireUpEndpoints();
343   RunHandshake();
344 
345   client_->session()->SendOrQueueDatagram("test");
346 
347   bool datagram_received = false;
348   EXPECT_CALL(*client_->visitor(), OnDatagramReceived(Eq("test")))
349       .WillOnce(Assign(&datagram_received, true));
350   ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
351       [&datagram_received]() { return datagram_received; }));
352 }
353 
354 // This test sets the datagram queue to an nearly-infinite queueing time, and
355 // then sends 1000 datagrams.  We expect to receive most of them back, since the
356 // datagrams would be paced out by the congestion controller.
TEST_F(QuicGenericSessionTest,EchoALotOfDatagrams)357 TEST_F(QuicGenericSessionTest, EchoALotOfDatagrams) {
358   CreateDefaultEndpoints(kEchoServer);
359   WireUpEndpoints();
360   RunHandshake();
361 
362   // Set the datagrams to effectively never expire.
363   client_->session()->SetDatagramMaxTimeInQueue(
364       (10000 * simulator::TestHarness::kRtt).ToAbsl());
365   for (int i = 0; i < 1000; i++) {
366     client_->session()->SendOrQueueDatagram(std::string(
367         client_->session()->GetGuaranteedLargestMessagePayload(), 'a'));
368   }
369 
370   size_t received = 0;
371   EXPECT_CALL(*client_->visitor(), OnDatagramReceived(_))
372       .WillRepeatedly(
373           [&received](absl::string_view /*datagram*/) { received++; });
374   ASSERT_TRUE(test_harness_.simulator().RunUntilOrTimeout(
375       [this]() { return client_->total_datagrams_processed() >= 1000; },
376       3 * simulator::TestHarness::kServerBandwidth.TransferTime(
377               1000 * kMaxOutgoingPacketSize)));
378   // Allow extra round-trips for the final flight of datagrams to arrive back.
379   test_harness_.simulator().RunFor(2 * simulator::TestHarness::kRtt);
380 
381   EXPECT_GT(received, 500u);
382   EXPECT_LT(received, 1000u);
383 }
384 
TEST_F(QuicGenericSessionTest,OutgoingStreamFlowControlBlocked)385 TEST_F(QuicGenericSessionTest, OutgoingStreamFlowControlBlocked) {
386   server_config_.SetMaxUnidirectionalStreamsToSend(4);
387   CreateDefaultEndpoints(kDiscardServer);
388   WireUpEndpoints();
389   RunHandshake();
390 
391   webtransport::Stream* stream;
392   for (int i = 0; i <= 3; i++) {
393     ASSERT_TRUE(client_->session()->CanOpenNextOutgoingUnidirectionalStream());
394     stream = client_->session()->OpenOutgoingUnidirectionalStream();
395     ASSERT_TRUE(stream != nullptr);
396     ASSERT_TRUE(stream->SendFin());
397   }
398   EXPECT_FALSE(client_->session()->CanOpenNextOutgoingUnidirectionalStream());
399 
400   // Receiving FINs for the streams we've just opened will cause the server to
401   // let us open more streams.
402   bool can_create_new_stream = false;
403   EXPECT_CALL(*client_->visitor(), OnCanCreateNewOutgoingUnidirectionalStream())
404       .WillOnce(Assign(&can_create_new_stream, true));
405   ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
406       [&can_create_new_stream]() { return can_create_new_stream; }));
407   EXPECT_TRUE(client_->session()->CanOpenNextOutgoingUnidirectionalStream());
408 }
409 
TEST_F(QuicGenericSessionTest,ExpireDatagrams)410 TEST_F(QuicGenericSessionTest, ExpireDatagrams) {
411   CreateDefaultEndpoints(kEchoServer);
412   WireUpEndpoints();
413   RunHandshake();
414 
415   // Set the datagrams to expire very soon.
416   client_->session()->SetDatagramMaxTimeInQueue(
417       (0.2 * simulator::TestHarness::kRtt).ToAbsl());
418   for (int i = 0; i < 1000; i++) {
419     client_->session()->SendOrQueueDatagram(std::string(
420         client_->session()->GetGuaranteedLargestMessagePayload(), 'a'));
421   }
422 
423   size_t received = 0;
424   EXPECT_CALL(*client_->visitor(), OnDatagramReceived(_))
425       .WillRepeatedly(
426           [&received](absl::string_view /*datagram*/) { received++; });
427   ASSERT_TRUE(test_harness_.simulator().RunUntilOrTimeout(
428       [this]() { return client_->total_datagrams_processed() >= 1000; },
429       3 * simulator::TestHarness::kServerBandwidth.TransferTime(
430               1000 * kMaxOutgoingPacketSize)));
431   // Allow extra round-trips for the final flight of datagrams to arrive back.
432   test_harness_.simulator().RunFor(2 * simulator::TestHarness::kRtt);
433   EXPECT_LT(received, 500);
434   EXPECT_EQ(received + client_->session()->GetDatagramStats().expired_outgoing,
435             1000);
436 }
437 
TEST_F(QuicGenericSessionTest,LoseDatagrams)438 TEST_F(QuicGenericSessionTest, LoseDatagrams) {
439   CreateDefaultEndpoints(kEchoServer);
440   test_harness_.WireUpEndpointsWithLoss(/*lose_every_n=*/4);
441   RunHandshake();
442 
443   // Set the datagrams to effectively never expire.
444   client_->session()->SetDatagramMaxTimeInQueue(
445       (10000 * simulator::TestHarness::kRtt).ToAbsl());
446   for (int i = 0; i < 1000; i++) {
447     client_->session()->SendOrQueueDatagram(std::string(
448         client_->session()->GetGuaranteedLargestMessagePayload(), 'a'));
449   }
450 
451   size_t received = 0;
452   EXPECT_CALL(*client_->visitor(), OnDatagramReceived(_))
453       .WillRepeatedly(
454           [&received](absl::string_view /*datagram*/) { received++; });
455   ASSERT_TRUE(test_harness_.simulator().RunUntilOrTimeout(
456       [this]() { return client_->total_datagrams_processed() >= 1000; },
457       4 * simulator::TestHarness::kServerBandwidth.TransferTime(
458               1000 * kMaxOutgoingPacketSize)));
459   // Allow extra round-trips for the final flight of datagrams to arrive back.
460   test_harness_.simulator().RunFor(4 * simulator::TestHarness::kRtt);
461 
462   QuicPacketCount client_lost =
463       client_->session()->GetDatagramStats().lost_outgoing;
464   QuicPacketCount server_lost =
465       server_->session()->GetDatagramStats().lost_outgoing;
466   EXPECT_LT(received, 800u);
467   EXPECT_GT(client_lost, 100u);
468   EXPECT_GT(server_lost, 100u);
469   EXPECT_EQ(received + client_lost + server_lost, 1000u);
470 }
471 
TEST_F(QuicGenericSessionTest,WriteWhenBufferFull)472 TEST_F(QuicGenericSessionTest, WriteWhenBufferFull) {
473   CreateDefaultEndpoints(kEchoServer);
474   WireUpEndpoints();
475   RunHandshake();
476 
477   const std::string buffer(64 * 1024 + 1, 'q');
478   webtransport::Stream* stream =
479       client_->session()->OpenOutgoingBidirectionalStream();
480   ASSERT_TRUE(stream != nullptr);
481 
482   ASSERT_TRUE(stream->CanWrite());
483   absl::Status status = quiche::WriteIntoStream(*stream, buffer);
484   QUICHE_EXPECT_OK(status);
485   EXPECT_FALSE(stream->CanWrite());
486 
487   status = quiche::WriteIntoStream(*stream, buffer);
488   EXPECT_THAT(status, StatusIs(absl::StatusCode::kUnavailable));
489 
490   quiche::StreamWriteOptions options;
491   options.set_buffer_unconditionally(true);
492   options.set_send_fin(true);
493   status = quiche::WriteIntoStream(*stream, buffer, options);
494   QUICHE_EXPECT_OK(status);
495   EXPECT_FALSE(stream->CanWrite());
496 
497   QuicByteCount total_received = 0;
498   for (;;) {
499     test_harness_.RunUntilWithDefaultTimeout(
500         [&] { return stream->PeekNextReadableRegion().has_data(); });
501     quiche::ReadStream::PeekResult result = stream->PeekNextReadableRegion();
502     total_received += result.peeked_data.size();
503     bool fin_consumed = stream->SkipBytes(result.peeked_data.size());
504     if (fin_consumed) {
505       break;
506     }
507   }
508   EXPECT_EQ(total_received, 128u * 1024u + 2);
509 }
510 
511 }  // namespace
512 }  // namespace quic::test
513