1 // Copyright (c) 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // An integration test that covers interactions between QuicGenericSession
6 // client and server sessions.
7
8 #include "quiche/quic/core/quic_generic_session.h"
9
10 #include <cstddef>
11 #include <cstring>
12 #include <memory>
13 #include <optional>
14 #include <string>
15 #include <vector>
16
17 #include "absl/strings/string_view.h"
18 #include "quiche/quic/core/crypto/quic_compressed_certs_cache.h"
19 #include "quiche/quic/core/crypto/quic_crypto_client_config.h"
20 #include "quiche/quic/core/crypto/quic_crypto_server_config.h"
21 #include "quiche/quic/core/crypto/quic_random.h"
22 #include "quiche/quic/core/quic_connection.h"
23 #include "quiche/quic/core/quic_constants.h"
24 #include "quiche/quic/core/quic_datagram_queue.h"
25 #include "quiche/quic/core/quic_error_codes.h"
26 #include "quiche/quic/core/quic_stream.h"
27 #include "quiche/quic/core/quic_types.h"
28 #include "quiche/quic/core/web_transport_interface.h"
29 #include "quiche/quic/platform/api/quic_test.h"
30 #include "quiche/quic/test_tools/crypto_test_utils.h"
31 #include "quiche/quic/test_tools/quic_session_peer.h"
32 #include "quiche/quic/test_tools/quic_test_utils.h"
33 #include "quiche/quic/test_tools/simulator/simulator.h"
34 #include "quiche/quic/test_tools/simulator/test_harness.h"
35 #include "quiche/quic/test_tools/web_transport_test_tools.h"
36 #include "quiche/quic/tools/web_transport_test_visitors.h"
37 #include "quiche/common/quiche_stream.h"
38 #include "quiche/common/test_tools/quiche_test_utils.h"
39 #include "quiche/web_transport/web_transport.h"
40
41 namespace quic::test {
42 namespace {
43
44 enum ServerType { kDiscardServer, kEchoServer };
45
46 using quiche::test::StatusIs;
47 using simulator::Simulator;
48 using testing::_;
49 using testing::Assign;
50 using testing::AtMost;
51 using testing::Eq;
52
53 class CountingDatagramObserver : public QuicDatagramQueue::Observer {
54 public:
CountingDatagramObserver(int & total)55 CountingDatagramObserver(int& total) : total_(total) {}
OnDatagramProcessed(std::optional<MessageStatus>)56 void OnDatagramProcessed(std::optional<MessageStatus>) { ++total_; }
57
58 private:
59 int& total_;
60 };
61
62 class ClientEndpoint : public simulator::QuicEndpointWithConnection {
63 public:
ClientEndpoint(Simulator * simulator,const std::string & name,const std::string & peer_name,const QuicConfig & config)64 ClientEndpoint(Simulator* simulator, const std::string& name,
65 const std::string& peer_name, const QuicConfig& config)
66 : QuicEndpointWithConnection(simulator, name, peer_name,
67 Perspective::IS_CLIENT,
68 GetQuicVersionsForGenericSession()),
69 crypto_config_(crypto_test_utils::ProofVerifierForTesting()),
70 session_(connection_.get(), false, nullptr, config, "test.example.com",
71 443, "example_alpn", &visitor_, /*visitor_owned=*/false,
72 std::make_unique<CountingDatagramObserver>(
73 total_datagrams_processed_),
74 &crypto_config_) {
75 session_.Initialize();
76 session_.connection()->sent_packet_manager().SetSendAlgorithm(
77 CongestionControlType::kBBRv2);
78 EXPECT_CALL(visitor_, OnSessionReady())
79 .Times(AtMost(1))
80 .WillOnce(Assign(&session_ready_, true));
81 }
82
session()83 QuicGenericClientSession* session() { return &session_; }
visitor()84 MockWebTransportSessionVisitor* visitor() { return &visitor_; }
85
session_ready() const86 bool session_ready() const { return session_ready_; }
total_datagrams_processed() const87 int total_datagrams_processed() const { return total_datagrams_processed_; }
88
89 private:
90 QuicCryptoClientConfig crypto_config_;
91 MockWebTransportSessionVisitor visitor_;
92 QuicGenericClientSession session_;
93 bool session_ready_ = false;
94 int total_datagrams_processed_ = 0;
95 };
96
97 class ServerEndpoint : public simulator::QuicEndpointWithConnection {
98 public:
ServerEndpoint(Simulator * simulator,const std::string & name,const std::string & peer_name,const QuicConfig & config,ServerType type)99 ServerEndpoint(Simulator* simulator, const std::string& name,
100 const std::string& peer_name, const QuicConfig& config,
101 ServerType type)
102 : QuicEndpointWithConnection(simulator, name, peer_name,
103 Perspective::IS_SERVER,
104 GetQuicVersionsForGenericSession()),
105 crypto_config_(QuicCryptoServerConfig::TESTING,
106 QuicRandom::GetInstance(),
107 crypto_test_utils::ProofSourceForTesting(),
108 KeyExchangeSource::Default()),
109 compressed_certs_cache_(
110 QuicCompressedCertsCache::kQuicCompressedCertsCacheSize),
111 session_(connection_.get(), false, nullptr, config, "example_alpn",
112 type == kEchoServer
113 ? static_cast<webtransport::SessionVisitor*>(
114 new EchoWebTransportSessionVisitor(
115 &session_,
116 /*open_server_initiated_echo_stream=*/false))
117 : static_cast<webtransport::SessionVisitor*>(
118 new DiscardWebTransportSessionVisitor(&session_)),
119 /*owns_visitor=*/true,
120 /*datagram_observer=*/nullptr, &crypto_config_,
121 &compressed_certs_cache_) {
122 session_.Initialize();
123 session_.connection()->sent_packet_manager().SetSendAlgorithm(
124 CongestionControlType::kBBRv2);
125 }
126
session()127 QuicGenericServerSession* session() { return &session_; }
128
129 private:
130 QuicCryptoServerConfig crypto_config_;
131 QuicCompressedCertsCache compressed_certs_cache_;
132 QuicGenericServerSession session_;
133 };
134
135 class QuicGenericSessionTest : public QuicTest {
136 public:
CreateDefaultEndpoints(ServerType server_type)137 void CreateDefaultEndpoints(ServerType server_type) {
138 client_ = std::make_unique<ClientEndpoint>(
139 &test_harness_.simulator(), "Client", "Server", client_config_);
140 server_ =
141 std::make_unique<ServerEndpoint>(&test_harness_.simulator(), "Server",
142 "Client", server_config_, server_type);
143 test_harness_.set_client(client_.get());
144 test_harness_.set_server(server_.get());
145 }
146
WireUpEndpoints()147 void WireUpEndpoints() { test_harness_.WireUpEndpoints(); }
148
RunHandshake()149 void RunHandshake() {
150 client_->session()->CryptoConnect();
151 bool result = test_harness_.RunUntilWithDefaultTimeout([this]() {
152 return client_->session_ready() ||
153 client_->session()->error() != QUIC_NO_ERROR;
154 });
155 EXPECT_TRUE(result);
156 }
157
158 protected:
159 QuicConfig client_config_ = DefaultQuicConfig();
160 QuicConfig server_config_ = DefaultQuicConfig();
161
162 simulator::TestHarness test_harness_;
163
164 std::unique_ptr<ClientEndpoint> client_;
165 std::unique_ptr<ServerEndpoint> server_;
166 };
167
TEST_F(QuicGenericSessionTest,SuccessfulHandshake)168 TEST_F(QuicGenericSessionTest, SuccessfulHandshake) {
169 CreateDefaultEndpoints(kDiscardServer);
170 WireUpEndpoints();
171 RunHandshake();
172 EXPECT_TRUE(client_->session_ready());
173 }
174
TEST_F(QuicGenericSessionTest,SendOutgoingStreams)175 TEST_F(QuicGenericSessionTest, SendOutgoingStreams) {
176 CreateDefaultEndpoints(kDiscardServer);
177 WireUpEndpoints();
178 RunHandshake();
179
180 std::vector<webtransport::Stream*> streams;
181 for (int i = 0; i < 10; i++) {
182 webtransport::Stream* stream =
183 client_->session()->OpenOutgoingUnidirectionalStream();
184 ASSERT_TRUE(stream->Write("test"));
185 streams.push_back(stream);
186 }
187 ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout([this]() {
188 return QuicSessionPeer::GetNumOpenDynamicStreams(server_->session()) == 10;
189 }));
190
191 for (webtransport::Stream* stream : streams) {
192 ASSERT_TRUE(stream->SendFin());
193 }
194 ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout([this]() {
195 return QuicSessionPeer::GetNumOpenDynamicStreams(server_->session()) == 0;
196 }));
197 }
198
TEST_F(QuicGenericSessionTest,EchoBidirectionalStreams)199 TEST_F(QuicGenericSessionTest, EchoBidirectionalStreams) {
200 CreateDefaultEndpoints(kEchoServer);
201 WireUpEndpoints();
202 RunHandshake();
203
204 webtransport::Stream* stream =
205 client_->session()->OpenOutgoingBidirectionalStream();
206 EXPECT_TRUE(stream->Write("Hello!"));
207
208 ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
209 [stream]() { return stream->ReadableBytes() == strlen("Hello!"); }));
210 std::string received;
211 WebTransportStream::ReadResult result = stream->Read(&received);
212 EXPECT_EQ(result.bytes_read, strlen("Hello!"));
213 EXPECT_FALSE(result.fin);
214 EXPECT_EQ(received, "Hello!");
215
216 EXPECT_TRUE(stream->SendFin());
217 ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout([this]() {
218 return QuicSessionPeer::GetNumOpenDynamicStreams(server_->session()) == 0;
219 }));
220 }
221
TEST_F(QuicGenericSessionTest,EchoUnidirectionalStreams)222 TEST_F(QuicGenericSessionTest, EchoUnidirectionalStreams) {
223 CreateDefaultEndpoints(kEchoServer);
224 WireUpEndpoints();
225 RunHandshake();
226
227 // Send two streams, but only send FIN on the second one.
228 webtransport::Stream* stream1 =
229 client_->session()->OpenOutgoingUnidirectionalStream();
230 EXPECT_TRUE(stream1->Write("Stream One"));
231 webtransport::Stream* stream2 =
232 client_->session()->OpenOutgoingUnidirectionalStream();
233 EXPECT_TRUE(stream2->Write("Stream Two"));
234 EXPECT_TRUE(stream2->SendFin());
235
236 // Wait until a stream is received.
237 bool stream_received = false;
238 EXPECT_CALL(*client_->visitor(), OnIncomingUnidirectionalStreamAvailable())
239 .Times(2)
240 .WillRepeatedly(Assign(&stream_received, true));
241 ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
242 [&stream_received]() { return stream_received; }));
243
244 // Receive a reply stream and expect it to be the second one.
245 webtransport::Stream* reply =
246 client_->session()->AcceptIncomingUnidirectionalStream();
247 ASSERT_TRUE(reply != nullptr);
248 std::string buffer;
249 WebTransportStream::ReadResult result = reply->Read(&buffer);
250 EXPECT_GT(result.bytes_read, 0u);
251 EXPECT_TRUE(result.fin);
252 EXPECT_EQ(buffer, "Stream Two");
253
254 // Reset reply-related variables.
255 stream_received = false;
256 buffer = "";
257
258 // Send FIN on the first stream, and expect to receive it back.
259 EXPECT_TRUE(stream1->SendFin());
260 ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
261 [&stream_received]() { return stream_received; }));
262 reply = client_->session()->AcceptIncomingUnidirectionalStream();
263 ASSERT_TRUE(reply != nullptr);
264 result = reply->Read(&buffer);
265 EXPECT_GT(result.bytes_read, 0u);
266 EXPECT_TRUE(result.fin);
267 EXPECT_EQ(buffer, "Stream One");
268 }
269
TEST_F(QuicGenericSessionTest,EchoStreamsUsingPeekApi)270 TEST_F(QuicGenericSessionTest, EchoStreamsUsingPeekApi) {
271 CreateDefaultEndpoints(kEchoServer);
272 WireUpEndpoints();
273 RunHandshake();
274
275 // Send two streams, a bidirectional and a unidirectional one, but only send
276 // FIN on the second one.
277 webtransport::Stream* stream1 =
278 client_->session()->OpenOutgoingBidirectionalStream();
279 EXPECT_TRUE(stream1->Write("Stream One"));
280 webtransport::Stream* stream2 =
281 client_->session()->OpenOutgoingUnidirectionalStream();
282 EXPECT_TRUE(stream2->Write("Stream Two"));
283 EXPECT_TRUE(stream2->SendFin());
284
285 // Wait until the unidirectional stream is received back.
286 bool stream_received_unidi = false;
287 EXPECT_CALL(*client_->visitor(), OnIncomingUnidirectionalStreamAvailable())
288 .WillOnce(Assign(&stream_received_unidi, true));
289 ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
290 [&]() { return stream_received_unidi; }));
291
292 // Receive the unidirectional echo reply.
293 webtransport::Stream* reply =
294 client_->session()->AcceptIncomingUnidirectionalStream();
295 ASSERT_TRUE(reply != nullptr);
296 std::string buffer;
297 quiche::ReadStream::PeekResult peek_result = reply->PeekNextReadableRegion();
298 EXPECT_EQ(peek_result.peeked_data, "Stream Two");
299 EXPECT_EQ(peek_result.fin_next, false);
300 EXPECT_EQ(peek_result.all_data_received, true);
301 bool fin_received =
302 quiche::ProcessAllReadableRegions(*reply, [&](absl::string_view chunk) {
303 buffer.append(chunk.data(), chunk.size());
304 return true;
305 });
306 EXPECT_TRUE(fin_received);
307 EXPECT_EQ(buffer, "Stream Two");
308
309 // Receive the bidirectional stream reply without a FIN.
310 ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
311 [&]() { return stream1->PeekNextReadableRegion().has_data(); }));
312 peek_result = stream1->PeekNextReadableRegion();
313 EXPECT_EQ(peek_result.peeked_data, "Stream One");
314 EXPECT_EQ(peek_result.fin_next, false);
315 EXPECT_EQ(peek_result.all_data_received, false);
316 fin_received = stream1->SkipBytes(strlen("Stream One"));
317 EXPECT_FALSE(fin_received);
318 peek_result = stream1->PeekNextReadableRegion();
319 EXPECT_EQ(peek_result.peeked_data, "");
320 EXPECT_EQ(peek_result.fin_next, false);
321 EXPECT_EQ(peek_result.all_data_received, false);
322
323 // Send FIN on the first stream, and expect to receive it back.
324 EXPECT_TRUE(stream1->SendFin());
325 ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
326 [&]() { return stream1->PeekNextReadableRegion().all_data_received; }));
327 peek_result = stream1->PeekNextReadableRegion();
328 EXPECT_EQ(peek_result.peeked_data, "");
329 EXPECT_EQ(peek_result.fin_next, true);
330 EXPECT_EQ(peek_result.all_data_received, true);
331
332 // Read FIN and expect the stream to get garbage collected.
333 webtransport::StreamId id = stream1->GetStreamId();
334 EXPECT_TRUE(client_->session()->GetStreamById(id) != nullptr);
335 fin_received = stream1->SkipBytes(0);
336 EXPECT_TRUE(fin_received);
337 EXPECT_TRUE(client_->session()->GetStreamById(id) == nullptr);
338 }
339
TEST_F(QuicGenericSessionTest,EchoDatagram)340 TEST_F(QuicGenericSessionTest, EchoDatagram) {
341 CreateDefaultEndpoints(kEchoServer);
342 WireUpEndpoints();
343 RunHandshake();
344
345 client_->session()->SendOrQueueDatagram("test");
346
347 bool datagram_received = false;
348 EXPECT_CALL(*client_->visitor(), OnDatagramReceived(Eq("test")))
349 .WillOnce(Assign(&datagram_received, true));
350 ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
351 [&datagram_received]() { return datagram_received; }));
352 }
353
354 // This test sets the datagram queue to an nearly-infinite queueing time, and
355 // then sends 1000 datagrams. We expect to receive most of them back, since the
356 // datagrams would be paced out by the congestion controller.
TEST_F(QuicGenericSessionTest,EchoALotOfDatagrams)357 TEST_F(QuicGenericSessionTest, EchoALotOfDatagrams) {
358 CreateDefaultEndpoints(kEchoServer);
359 WireUpEndpoints();
360 RunHandshake();
361
362 // Set the datagrams to effectively never expire.
363 client_->session()->SetDatagramMaxTimeInQueue(
364 (10000 * simulator::TestHarness::kRtt).ToAbsl());
365 for (int i = 0; i < 1000; i++) {
366 client_->session()->SendOrQueueDatagram(std::string(
367 client_->session()->GetGuaranteedLargestMessagePayload(), 'a'));
368 }
369
370 size_t received = 0;
371 EXPECT_CALL(*client_->visitor(), OnDatagramReceived(_))
372 .WillRepeatedly(
373 [&received](absl::string_view /*datagram*/) { received++; });
374 ASSERT_TRUE(test_harness_.simulator().RunUntilOrTimeout(
375 [this]() { return client_->total_datagrams_processed() >= 1000; },
376 3 * simulator::TestHarness::kServerBandwidth.TransferTime(
377 1000 * kMaxOutgoingPacketSize)));
378 // Allow extra round-trips for the final flight of datagrams to arrive back.
379 test_harness_.simulator().RunFor(2 * simulator::TestHarness::kRtt);
380
381 EXPECT_GT(received, 500u);
382 EXPECT_LT(received, 1000u);
383 }
384
TEST_F(QuicGenericSessionTest,OutgoingStreamFlowControlBlocked)385 TEST_F(QuicGenericSessionTest, OutgoingStreamFlowControlBlocked) {
386 server_config_.SetMaxUnidirectionalStreamsToSend(4);
387 CreateDefaultEndpoints(kDiscardServer);
388 WireUpEndpoints();
389 RunHandshake();
390
391 webtransport::Stream* stream;
392 for (int i = 0; i <= 3; i++) {
393 ASSERT_TRUE(client_->session()->CanOpenNextOutgoingUnidirectionalStream());
394 stream = client_->session()->OpenOutgoingUnidirectionalStream();
395 ASSERT_TRUE(stream != nullptr);
396 ASSERT_TRUE(stream->SendFin());
397 }
398 EXPECT_FALSE(client_->session()->CanOpenNextOutgoingUnidirectionalStream());
399
400 // Receiving FINs for the streams we've just opened will cause the server to
401 // let us open more streams.
402 bool can_create_new_stream = false;
403 EXPECT_CALL(*client_->visitor(), OnCanCreateNewOutgoingUnidirectionalStream())
404 .WillOnce(Assign(&can_create_new_stream, true));
405 ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout(
406 [&can_create_new_stream]() { return can_create_new_stream; }));
407 EXPECT_TRUE(client_->session()->CanOpenNextOutgoingUnidirectionalStream());
408 }
409
TEST_F(QuicGenericSessionTest,ExpireDatagrams)410 TEST_F(QuicGenericSessionTest, ExpireDatagrams) {
411 CreateDefaultEndpoints(kEchoServer);
412 WireUpEndpoints();
413 RunHandshake();
414
415 // Set the datagrams to expire very soon.
416 client_->session()->SetDatagramMaxTimeInQueue(
417 (0.2 * simulator::TestHarness::kRtt).ToAbsl());
418 for (int i = 0; i < 1000; i++) {
419 client_->session()->SendOrQueueDatagram(std::string(
420 client_->session()->GetGuaranteedLargestMessagePayload(), 'a'));
421 }
422
423 size_t received = 0;
424 EXPECT_CALL(*client_->visitor(), OnDatagramReceived(_))
425 .WillRepeatedly(
426 [&received](absl::string_view /*datagram*/) { received++; });
427 ASSERT_TRUE(test_harness_.simulator().RunUntilOrTimeout(
428 [this]() { return client_->total_datagrams_processed() >= 1000; },
429 3 * simulator::TestHarness::kServerBandwidth.TransferTime(
430 1000 * kMaxOutgoingPacketSize)));
431 // Allow extra round-trips for the final flight of datagrams to arrive back.
432 test_harness_.simulator().RunFor(2 * simulator::TestHarness::kRtt);
433 EXPECT_LT(received, 500);
434 EXPECT_EQ(received + client_->session()->GetDatagramStats().expired_outgoing,
435 1000);
436 }
437
TEST_F(QuicGenericSessionTest,LoseDatagrams)438 TEST_F(QuicGenericSessionTest, LoseDatagrams) {
439 CreateDefaultEndpoints(kEchoServer);
440 test_harness_.WireUpEndpointsWithLoss(/*lose_every_n=*/4);
441 RunHandshake();
442
443 // Set the datagrams to effectively never expire.
444 client_->session()->SetDatagramMaxTimeInQueue(
445 (10000 * simulator::TestHarness::kRtt).ToAbsl());
446 for (int i = 0; i < 1000; i++) {
447 client_->session()->SendOrQueueDatagram(std::string(
448 client_->session()->GetGuaranteedLargestMessagePayload(), 'a'));
449 }
450
451 size_t received = 0;
452 EXPECT_CALL(*client_->visitor(), OnDatagramReceived(_))
453 .WillRepeatedly(
454 [&received](absl::string_view /*datagram*/) { received++; });
455 ASSERT_TRUE(test_harness_.simulator().RunUntilOrTimeout(
456 [this]() { return client_->total_datagrams_processed() >= 1000; },
457 4 * simulator::TestHarness::kServerBandwidth.TransferTime(
458 1000 * kMaxOutgoingPacketSize)));
459 // Allow extra round-trips for the final flight of datagrams to arrive back.
460 test_harness_.simulator().RunFor(4 * simulator::TestHarness::kRtt);
461
462 QuicPacketCount client_lost =
463 client_->session()->GetDatagramStats().lost_outgoing;
464 QuicPacketCount server_lost =
465 server_->session()->GetDatagramStats().lost_outgoing;
466 EXPECT_LT(received, 800u);
467 EXPECT_GT(client_lost, 100u);
468 EXPECT_GT(server_lost, 100u);
469 EXPECT_EQ(received + client_lost + server_lost, 1000u);
470 }
471
TEST_F(QuicGenericSessionTest,WriteWhenBufferFull)472 TEST_F(QuicGenericSessionTest, WriteWhenBufferFull) {
473 CreateDefaultEndpoints(kEchoServer);
474 WireUpEndpoints();
475 RunHandshake();
476
477 const std::string buffer(64 * 1024 + 1, 'q');
478 webtransport::Stream* stream =
479 client_->session()->OpenOutgoingBidirectionalStream();
480 ASSERT_TRUE(stream != nullptr);
481
482 ASSERT_TRUE(stream->CanWrite());
483 absl::Status status = quiche::WriteIntoStream(*stream, buffer);
484 QUICHE_EXPECT_OK(status);
485 EXPECT_FALSE(stream->CanWrite());
486
487 status = quiche::WriteIntoStream(*stream, buffer);
488 EXPECT_THAT(status, StatusIs(absl::StatusCode::kUnavailable));
489
490 quiche::StreamWriteOptions options;
491 options.set_buffer_unconditionally(true);
492 options.set_send_fin(true);
493 status = quiche::WriteIntoStream(*stream, buffer, options);
494 QUICHE_EXPECT_OK(status);
495 EXPECT_FALSE(stream->CanWrite());
496
497 QuicByteCount total_received = 0;
498 for (;;) {
499 test_harness_.RunUntilWithDefaultTimeout(
500 [&] { return stream->PeekNextReadableRegion().has_data(); });
501 quiche::ReadStream::PeekResult result = stream->PeekNextReadableRegion();
502 total_received += result.peeked_data.size();
503 bool fin_consumed = stream->SkipBytes(result.peeked_data.size());
504 if (fin_consumed) {
505 break;
506 }
507 }
508 EXPECT_EQ(total_received, 128u * 1024u + 2);
509 }
510
511 } // namespace
512 } // namespace quic::test
513