1 // Copyright (c) 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/web_transport/encapsulated/encapsulated_web_transport.h"
6
7 #include <array>
8 #include <memory>
9 #include <string>
10 #include <utility>
11 #include <vector>
12
13 #include "absl/status/status.h"
14 #include "absl/strings/string_view.h"
15 #include "absl/types/span.h"
16 #include "quiche/common/capsule.h"
17 #include "quiche/common/http/http_header_block.h"
18 #include "quiche/common/platform/api/quiche_test.h"
19 #include "quiche/common/quiche_buffer_allocator.h"
20 #include "quiche/common/quiche_stream.h"
21 #include "quiche/common/simple_buffer_allocator.h"
22 #include "quiche/common/test_tools/mock_streams.h"
23 #include "quiche/common/test_tools/quiche_test_utils.h"
24 #include "quiche/web_transport/test_tools/mock_web_transport.h"
25 #include "quiche/web_transport/web_transport.h"
26
27 namespace webtransport::test {
28 namespace {
29
30 using ::quiche::Capsule;
31 using ::quiche::CapsuleType;
32 using ::quiche::test::StatusIs;
33 using ::testing::_;
34 using ::testing::ElementsAre;
35 using ::testing::HasSubstr;
36 using ::testing::IsEmpty;
37 using ::testing::Return;
38 using ::testing::StrEq;
39
40 class EncapsulatedWebTransportTest : public quiche::test::QuicheTest,
41 public quiche::CapsuleParser::Visitor {
42 public:
EncapsulatedWebTransportTest()43 EncapsulatedWebTransportTest() : parser_(this), reader_(&read_buffer_) {
44 ON_CALL(fatal_error_callback_, Call(_))
45 .WillByDefault([](absl::string_view error) {
46 ADD_FAILURE() << "Fatal session error: " << error;
47 });
48 ON_CALL(writer_, Writev(_, _))
49 .WillByDefault([&](absl::Span<const absl::string_view> data,
50 const quiche::StreamWriteOptions& options) {
51 for (absl::string_view fragment : data) {
52 parser_.IngestCapsuleFragment(fragment);
53 }
54 writer_.ProcessOptions(options);
55 return absl::OkStatus();
56 });
57 }
58
CreateTransport(Perspective perspective)59 std::unique_ptr<EncapsulatedSession> CreateTransport(
60 Perspective perspective) {
61 auto transport = std::make_unique<EncapsulatedSession>(
62 perspective, fatal_error_callback_.AsStdFunction());
63 session_ = transport.get();
64 return transport;
65 }
66
CreateAndStoreVisitor()67 std::unique_ptr<SessionVisitor> CreateAndStoreVisitor() {
68 auto visitor = std::make_unique<testing::StrictMock<MockSessionVisitor>>();
69 visitor_ = visitor.get();
70 return visitor;
71 }
72
73 MOCK_METHOD(bool, OnCapsule, (const Capsule&), (override));
74
OnCapsuleParseFailure(absl::string_view error_message)75 void OnCapsuleParseFailure(absl::string_view error_message) override {
76 ADD_FAILURE() << "Written an invalid capsule: " << error_message;
77 }
78
ProcessIncomingCapsule(const Capsule & capsule)79 void ProcessIncomingCapsule(const Capsule& capsule) {
80 quiche::QuicheBuffer buffer =
81 quiche::SerializeCapsule(capsule, quiche::SimpleBufferAllocator::Get());
82 read_buffer_.append(buffer.data(), buffer.size());
83 session_->OnCanRead();
84 }
85
86 template <typename CapsuleType>
ProcessIncomingCapsule(const CapsuleType & capsule)87 void ProcessIncomingCapsule(const CapsuleType& capsule) {
88 quiche::QuicheBuffer buffer = quiche::SerializeCapsule(
89 quiche::Capsule(capsule), quiche::SimpleBufferAllocator::Get());
90 read_buffer_.append(buffer.data(), buffer.size());
91 session_->OnCanRead();
92 }
93
DefaultHandshakeForClient(EncapsulatedSession & session)94 void DefaultHandshakeForClient(EncapsulatedSession& session) {
95 quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
96 session.InitializeClient(CreateAndStoreVisitor(), outgoing_headers,
97 &writer_, &reader_);
98 EXPECT_CALL(*visitor_, OnSessionReady());
99 session.ProcessIncomingServerHeaders(incoming_headers);
100 }
101
102 protected:
103 quiche::CapsuleParser parser_;
104 quiche::test::MockWriteStream writer_;
105 std::string read_buffer_;
106 quiche::test::ReadStreamFromString reader_;
107 MockSessionVisitor* visitor_ = nullptr;
108 EncapsulatedSession* session_ = nullptr;
109 testing::MockFunction<void(absl::string_view)> fatal_error_callback_;
110 };
111
TEST_F(EncapsulatedWebTransportTest,IsOpenedBy)112 TEST_F(EncapsulatedWebTransportTest, IsOpenedBy) {
113 EXPECT_EQ(IsIdOpenedBy(0x00, Perspective::kClient), true);
114 EXPECT_EQ(IsIdOpenedBy(0x01, Perspective::kClient), false);
115 EXPECT_EQ(IsIdOpenedBy(0x02, Perspective::kClient), true);
116 EXPECT_EQ(IsIdOpenedBy(0x03, Perspective::kClient), false);
117
118 EXPECT_EQ(IsIdOpenedBy(0x00, Perspective::kServer), false);
119 EXPECT_EQ(IsIdOpenedBy(0x01, Perspective::kServer), true);
120 EXPECT_EQ(IsIdOpenedBy(0x02, Perspective::kServer), false);
121 EXPECT_EQ(IsIdOpenedBy(0x03, Perspective::kServer), true);
122 }
123
TEST_F(EncapsulatedWebTransportTest,SetupClientSession)124 TEST_F(EncapsulatedWebTransportTest, SetupClientSession) {
125 std::unique_ptr<EncapsulatedSession> session =
126 CreateTransport(Perspective::kClient);
127 quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
128 EXPECT_EQ(session->state(), EncapsulatedSession::kUninitialized);
129 session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_,
130 &reader_);
131 EXPECT_EQ(session->state(), EncapsulatedSession::kWaitingForHeaders);
132 EXPECT_CALL(*visitor_, OnSessionReady());
133 session->ProcessIncomingServerHeaders(incoming_headers);
134 EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
135 }
136
TEST_F(EncapsulatedWebTransportTest,SetupServerSession)137 TEST_F(EncapsulatedWebTransportTest, SetupServerSession) {
138 std::unique_ptr<EncapsulatedSession> session =
139 CreateTransport(Perspective::kServer);
140 quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
141 EXPECT_EQ(session->state(), EncapsulatedSession::kUninitialized);
142 std::unique_ptr<SessionVisitor> visitor = CreateAndStoreVisitor();
143 EXPECT_CALL(*visitor_, OnSessionReady());
144 session->InitializeServer(std::move(visitor), outgoing_headers,
145 incoming_headers, &writer_, &reader_);
146 EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
147 }
148
TEST_F(EncapsulatedWebTransportTest,CloseSession)149 TEST_F(EncapsulatedWebTransportTest, CloseSession) {
150 std::unique_ptr<EncapsulatedSession> session =
151 CreateTransport(Perspective::kClient);
152 DefaultHandshakeForClient(*session);
153 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
154 EXPECT_EQ(capsule.capsule_type(), CapsuleType::CLOSE_WEBTRANSPORT_SESSION);
155 EXPECT_EQ(capsule.close_web_transport_session_capsule().error_code, 0x1234);
156 EXPECT_EQ(capsule.close_web_transport_session_capsule().error_message,
157 "test close");
158 return true;
159 });
160 EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
161 EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close")));
162 session->CloseSession(0x1234, "test close");
163 EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed);
164 EXPECT_TRUE(writer_.fin_written());
165
166 EXPECT_CALL(fatal_error_callback_, Call(_))
167 .WillOnce([](absl::string_view error) {
168 EXPECT_THAT(error, HasSubstr("close a session that is already closed"));
169 });
170 session->CloseSession(0x1234, "test close");
171 }
172
TEST_F(EncapsulatedWebTransportTest,CloseSessionWriteBlocked)173 TEST_F(EncapsulatedWebTransportTest, CloseSessionWriteBlocked) {
174 std::unique_ptr<EncapsulatedSession> session =
175 CreateTransport(Perspective::kClient);
176 DefaultHandshakeForClient(*session);
177 EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
178 EXPECT_CALL(*this, OnCapsule(_)).Times(0);
179 EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
180 session->CloseSession(0x1234, "test close");
181 EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosing);
182
183 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
184 EXPECT_EQ(capsule.capsule_type(), CapsuleType::CLOSE_WEBTRANSPORT_SESSION);
185 EXPECT_EQ(capsule.close_web_transport_session_capsule().error_code, 0x1234);
186 EXPECT_EQ(capsule.close_web_transport_session_capsule().error_message,
187 "test close");
188 return true;
189 });
190 EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(true));
191 EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close")));
192 session->OnCanWrite();
193 EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed);
194 EXPECT_TRUE(writer_.fin_written());
195 }
196
TEST_F(EncapsulatedWebTransportTest,ReceiveFin)197 TEST_F(EncapsulatedWebTransportTest, ReceiveFin) {
198 std::unique_ptr<EncapsulatedSession> session =
199 CreateTransport(Perspective::kClient);
200 DefaultHandshakeForClient(*session);
201
202 EXPECT_CALL(*visitor_, OnSessionClosed(0, IsEmpty()));
203 reader_.set_fin();
204 session->OnCanRead();
205 EXPECT_TRUE(writer_.fin_written());
206 }
207
TEST_F(EncapsulatedWebTransportTest,ReceiveCloseSession)208 TEST_F(EncapsulatedWebTransportTest, ReceiveCloseSession) {
209 std::unique_ptr<EncapsulatedSession> session =
210 CreateTransport(Perspective::kClient);
211 DefaultHandshakeForClient(*session);
212
213 EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test")));
214 ProcessIncomingCapsule(Capsule::CloseWebTransportSession(0x1234, "test"));
215 EXPECT_TRUE(writer_.fin_written());
216 reader_.set_fin();
217 session->OnCanRead();
218 }
219
TEST_F(EncapsulatedWebTransportTest,ReceiveMalformedData)220 TEST_F(EncapsulatedWebTransportTest, ReceiveMalformedData) {
221 std::unique_ptr<EncapsulatedSession> session =
222 CreateTransport(Perspective::kClient);
223 DefaultHandshakeForClient(*session);
224
225 EXPECT_CALL(fatal_error_callback_, Call(HasSubstr("too much capsule data")))
226 .WillOnce([] {});
227 read_buffer_ = std::string(2 * 1024 * 1024, '\xff');
228 session->OnCanRead();
229 }
230
TEST_F(EncapsulatedWebTransportTest,SendDatagrams)231 TEST_F(EncapsulatedWebTransportTest, SendDatagrams) {
232 std::unique_ptr<EncapsulatedSession> session =
233 CreateTransport(Perspective::kClient);
234 DefaultHandshakeForClient(*session);
235 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
236 EXPECT_EQ(capsule.capsule_type(), quiche::CapsuleType::DATAGRAM);
237 EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
238 return true;
239 });
240 DatagramStatus status = session->SendOrQueueDatagram("test");
241 EXPECT_EQ(status.code, DatagramStatusCode::kSuccess);
242 }
243
TEST_F(EncapsulatedWebTransportTest,SendDatagramsEarly)244 TEST_F(EncapsulatedWebTransportTest, SendDatagramsEarly) {
245 std::unique_ptr<EncapsulatedSession> session =
246 CreateTransport(Perspective::kClient);
247 quiche::HttpHeaderBlock outgoing_headers;
248 session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_,
249 &reader_);
250 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
251 EXPECT_EQ(capsule.capsule_type(), quiche::CapsuleType::DATAGRAM);
252 EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
253 return true;
254 });
255 ASSERT_EQ(session->state(), EncapsulatedSession::kWaitingForHeaders);
256 session->SendOrQueueDatagram("test");
257 }
258
TEST_F(EncapsulatedWebTransportTest,SendDatagramsBeforeInitialization)259 TEST_F(EncapsulatedWebTransportTest, SendDatagramsBeforeInitialization) {
260 std::unique_ptr<EncapsulatedSession> session =
261 CreateTransport(Perspective::kClient);
262 quiche::HttpHeaderBlock outgoing_headers;
263 EXPECT_CALL(*this, OnCapsule(_)).Times(0);
264 ASSERT_EQ(session->state(), EncapsulatedSession::kUninitialized);
265 session->SendOrQueueDatagram("test");
266
267 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
268 EXPECT_EQ(capsule.capsule_type(), CapsuleType::DATAGRAM);
269 EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
270 return true;
271 });
272 DefaultHandshakeForClient(*session);
273 }
274
TEST_F(EncapsulatedWebTransportTest,SendDatagramsTooBig)275 TEST_F(EncapsulatedWebTransportTest, SendDatagramsTooBig) {
276 std::unique_ptr<EncapsulatedSession> session =
277 CreateTransport(Perspective::kClient);
278 DefaultHandshakeForClient(*session);
279 EXPECT_CALL(*this, OnCapsule(_)).Times(0);
280 std::string long_string(16 * 1024, 'a');
281 DatagramStatus status = session->SendOrQueueDatagram(long_string);
282 EXPECT_EQ(status.code, DatagramStatusCode::kTooBig);
283 }
284
TEST_F(EncapsulatedWebTransportTest,ReceiveDatagrams)285 TEST_F(EncapsulatedWebTransportTest, ReceiveDatagrams) {
286 std::unique_ptr<EncapsulatedSession> session =
287 CreateTransport(Perspective::kClient);
288 DefaultHandshakeForClient(*session);
289 EXPECT_CALL(*visitor_, OnDatagramReceived(_))
290 .WillOnce([](absl::string_view data) { EXPECT_EQ(data, "test"); });
291 ProcessIncomingCapsule(Capsule::Datagram("test"));
292 }
293
TEST_F(EncapsulatedWebTransportTest,SendDraining)294 TEST_F(EncapsulatedWebTransportTest, SendDraining) {
295 std::unique_ptr<EncapsulatedSession> session =
296 CreateTransport(Perspective::kClient);
297 DefaultHandshakeForClient(*session);
298 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
299 EXPECT_EQ(capsule.capsule_type(), CapsuleType::DRAIN_WEBTRANSPORT_SESSION);
300 return true;
301 });
302 session->NotifySessionDraining();
303 }
304
TEST_F(EncapsulatedWebTransportTest,ReceiveDraining)305 TEST_F(EncapsulatedWebTransportTest, ReceiveDraining) {
306 std::unique_ptr<EncapsulatedSession> session =
307 CreateTransport(Perspective::kClient);
308 DefaultHandshakeForClient(*session);
309 testing::MockFunction<void()> callback;
310 session->SetOnDraining(callback.AsStdFunction());
311 EXPECT_CALL(callback, Call());
312 ProcessIncomingCapsule(Capsule(quiche::DrainWebTransportSessionCapsule()));
313 }
314
TEST_F(EncapsulatedWebTransportTest,WriteErrorDatagram)315 TEST_F(EncapsulatedWebTransportTest, WriteErrorDatagram) {
316 std::unique_ptr<EncapsulatedSession> session =
317 CreateTransport(Perspective::kClient);
318 DefaultHandshakeForClient(*session);
319 EXPECT_CALL(writer_, Writev(_, _))
320 .WillOnce(Return(absl::InternalError("Test write error")));
321 EXPECT_CALL(fatal_error_callback_, Call(_))
322 .WillOnce([](absl::string_view error) {
323 EXPECT_THAT(error, HasSubstr("Test write error"));
324 });
325 DatagramStatus status = session->SendOrQueueDatagram("test");
326 EXPECT_EQ(status.code, DatagramStatusCode::kInternalError);
327 }
328
TEST_F(EncapsulatedWebTransportTest,WriteErrorControlCapsule)329 TEST_F(EncapsulatedWebTransportTest, WriteErrorControlCapsule) {
330 std::unique_ptr<EncapsulatedSession> session =
331 CreateTransport(Perspective::kClient);
332 DefaultHandshakeForClient(*session);
333 EXPECT_CALL(writer_, Writev(_, _))
334 .WillOnce(Return(absl::InternalError("Test write error")));
335 EXPECT_CALL(fatal_error_callback_, Call(_))
336 .WillOnce([](absl::string_view error) {
337 EXPECT_THAT(error, HasSubstr("Test write error"));
338 });
339 session->NotifySessionDraining();
340 }
341
TEST_F(EncapsulatedWebTransportTest,SimpleRead)342 TEST_F(EncapsulatedWebTransportTest, SimpleRead) {
343 std::unique_ptr<EncapsulatedSession> session =
344 CreateTransport(Perspective::kClient);
345 DefaultHandshakeForClient(*session);
346 bool stream_received = false;
347 EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable())
348 .WillOnce([&] { stream_received = true; });
349 std::string data = "test";
350 ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, data, false});
351 // Make sure data gets copied.
352 data[0] = 'q';
353 EXPECT_TRUE(stream_received);
354 Stream* stream = session->AcceptIncomingBidirectionalStream();
355 ASSERT_TRUE(stream != nullptr);
356 EXPECT_EQ(stream->GetStreamId(), 1u);
357 EXPECT_EQ(stream->visitor(), nullptr);
358 EXPECT_EQ(stream->ReadableBytes(), 4u);
359
360 quiche::ReadStream::PeekResult peek = stream->PeekNextReadableRegion();
361 EXPECT_EQ(peek.peeked_data, "test");
362 EXPECT_FALSE(peek.fin_next);
363 EXPECT_FALSE(peek.all_data_received);
364
365 std::string buffer;
366 quiche::ReadStream::ReadResult read = stream->Read(&buffer);
367 EXPECT_EQ(read.bytes_read, 4);
368 EXPECT_FALSE(read.fin);
369 EXPECT_EQ(buffer, "test");
370 EXPECT_EQ(stream->ReadableBytes(), 0u);
371 }
372
373 class MockStreamVisitorWithDestructor : public MockStreamVisitor {
374 public:
~MockStreamVisitorWithDestructor()375 ~MockStreamVisitorWithDestructor() { OnDelete(); }
376
377 MOCK_METHOD(void, OnDelete, (), ());
378 };
379
SetupVisitor(Stream & stream)380 MockStreamVisitorWithDestructor* SetupVisitor(Stream& stream) {
381 auto visitor = std::make_unique<MockStreamVisitorWithDestructor>();
382 MockStreamVisitorWithDestructor* result = visitor.get();
383 stream.SetVisitor(std::move(visitor));
384 return result;
385 }
386
TEST_F(EncapsulatedWebTransportTest,ImmediateRead)387 TEST_F(EncapsulatedWebTransportTest, ImmediateRead) {
388 std::unique_ptr<EncapsulatedSession> session =
389 CreateTransport(Perspective::kClient);
390 DefaultHandshakeForClient(*session);
391 EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
392 ProcessIncomingCapsule(
393 quiche::WebTransportStreamDataCapsule{1, "abcd", false});
394 Stream* stream = session->AcceptIncomingBidirectionalStream();
395 ASSERT_TRUE(stream != nullptr);
396 EXPECT_EQ(stream->ReadableBytes(), 4u);
397
398 MockStreamVisitor* visitor = SetupVisitor(*stream);
399 EXPECT_CALL(*visitor, OnCanRead()).WillOnce([&] {
400 std::string output;
401 (void)stream->Read(&output);
402 EXPECT_EQ(output, "abcdef");
403 });
404 ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "ef", false});
405 }
406
TEST_F(EncapsulatedWebTransportTest,FinPeek)407 TEST_F(EncapsulatedWebTransportTest, FinPeek) {
408 std::unique_ptr<EncapsulatedSession> session =
409 CreateTransport(Perspective::kClient);
410 DefaultHandshakeForClient(*session);
411 EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
412 ProcessIncomingCapsule(
413 quiche::WebTransportStreamDataCapsule{1, "abcd", false});
414 Stream* stream = session->AcceptIncomingBidirectionalStream();
415 ASSERT_TRUE(stream != nullptr);
416 EXPECT_EQ(stream->ReadableBytes(), 4u);
417
418 ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "ef", true});
419
420 quiche::ReadStream::PeekResult peek = stream->PeekNextReadableRegion();
421 EXPECT_EQ(peek.peeked_data, "abcd");
422 EXPECT_FALSE(peek.fin_next);
423 EXPECT_TRUE(peek.all_data_received);
424
425 EXPECT_FALSE(stream->SkipBytes(2));
426 peek = stream->PeekNextReadableRegion();
427 EXPECT_FALSE(peek.fin_next);
428 EXPECT_TRUE(peek.all_data_received);
429
430 EXPECT_FALSE(stream->SkipBytes(2));
431 peek = stream->PeekNextReadableRegion();
432 EXPECT_EQ(peek.peeked_data, "ef");
433 EXPECT_TRUE(peek.fin_next);
434 EXPECT_TRUE(peek.all_data_received);
435
436 EXPECT_TRUE(stream->SkipBytes(2));
437 }
438
TEST_F(EncapsulatedWebTransportTest,FinRead)439 TEST_F(EncapsulatedWebTransportTest, FinRead) {
440 std::unique_ptr<EncapsulatedSession> session =
441 CreateTransport(Perspective::kClient);
442 DefaultHandshakeForClient(*session);
443 EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
444 ProcessIncomingCapsule(
445 quiche::WebTransportStreamDataCapsule{1, "abcdef", true});
446 Stream* stream = session->AcceptIncomingBidirectionalStream();
447 ASSERT_TRUE(stream != nullptr);
448 EXPECT_EQ(stream->ReadableBytes(), 6u);
449
450 std::array<char, 3> buffer;
451 quiche::ReadStream::ReadResult read = stream->Read(absl::MakeSpan(buffer));
452 EXPECT_THAT(buffer, ElementsAre('a', 'b', 'c'));
453 EXPECT_EQ(read.bytes_read, 3);
454 EXPECT_FALSE(read.fin);
455
456 read = stream->Read(absl::MakeSpan(buffer));
457 EXPECT_THAT(buffer, ElementsAre('d', 'e', 'f'));
458 EXPECT_EQ(read.bytes_read, 3);
459 EXPECT_TRUE(read.fin);
460 }
461
TEST_F(EncapsulatedWebTransportTest,LargeRead)462 TEST_F(EncapsulatedWebTransportTest, LargeRead) {
463 std::unique_ptr<EncapsulatedSession> session =
464 CreateTransport(Perspective::kClient);
465 DefaultHandshakeForClient(*session);
466 EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
467 ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{
468 1, std::string(64 * 1024, 'a'), true});
469 Stream* stream = session->AcceptIncomingBidirectionalStream();
470 ASSERT_TRUE(stream != nullptr);
471 EXPECT_EQ(stream->ReadableBytes(), 65536u);
472
473 for (int i = 0; i < 64; i++) {
474 std::array<char, 1024> buffer;
475 quiche::ReadStream::ReadResult read = stream->Read(absl::MakeSpan(buffer));
476 EXPECT_EQ(read.bytes_read, 1024);
477 EXPECT_EQ(read.fin, i == 63);
478 }
479 }
480
TEST_F(EncapsulatedWebTransportTest,DoubleFinReceived)481 TEST_F(EncapsulatedWebTransportTest, DoubleFinReceived) {
482 std::unique_ptr<EncapsulatedSession> session =
483 CreateTransport(Perspective::kClient);
484 DefaultHandshakeForClient(*session);
485 EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
486 ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "abc", true});
487 Stream* stream = session->AcceptIncomingBidirectionalStream();
488 ASSERT_TRUE(stream != nullptr);
489
490 EXPECT_CALL(fatal_error_callback_, Call(_))
491 .WillOnce([](absl::string_view error) {
492 EXPECT_THAT(error, HasSubstr("has already received a FIN"));
493 });
494 ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "def", true});
495 }
496
TEST_F(EncapsulatedWebTransportTest,CanWriteUnidiBidi)497 TEST_F(EncapsulatedWebTransportTest, CanWriteUnidiBidi) {
498 std::unique_ptr<EncapsulatedSession> session =
499 CreateTransport(Perspective::kClient);
500 DefaultHandshakeForClient(*session);
501 EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
502 EXPECT_CALL(*visitor_, OnIncomingUnidirectionalStreamAvailable());
503 ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "abc", true});
504 ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{3, "abc", true});
505
506 Stream* stream = session->AcceptIncomingBidirectionalStream();
507 ASSERT_TRUE(stream != nullptr);
508 EXPECT_TRUE(stream->CanWrite());
509
510 stream = session->AcceptIncomingUnidirectionalStream();
511 ASSERT_TRUE(stream != nullptr);
512 EXPECT_FALSE(stream->CanWrite());
513
514 stream = session->OpenOutgoingBidirectionalStream();
515 ASSERT_TRUE(stream != nullptr);
516 EXPECT_TRUE(stream->CanWrite());
517
518 stream = session->OpenOutgoingUnidirectionalStream();
519 ASSERT_TRUE(stream != nullptr);
520 EXPECT_TRUE(stream->CanWrite());
521 }
522
TEST_F(EncapsulatedWebTransportTest,ReadOnlyGarbageCollection)523 TEST_F(EncapsulatedWebTransportTest, ReadOnlyGarbageCollection) {
524 std::unique_ptr<EncapsulatedSession> session =
525 CreateTransport(Perspective::kClient);
526 DefaultHandshakeForClient(*session);
527 EXPECT_CALL(*visitor_, OnIncomingUnidirectionalStreamAvailable());
528 ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{3, "abc", true});
529
530 Stream* stream = session->AcceptIncomingUnidirectionalStream();
531 ASSERT_TRUE(stream != nullptr);
532 EXPECT_TRUE(stream->SkipBytes(3));
533
534 MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
535 bool deleted = false;
536 EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
537 session->GarbageCollectStreams();
538 EXPECT_TRUE(deleted);
539 }
540
TEST_F(EncapsulatedWebTransportTest,WriteOnlyGarbageCollection)541 TEST_F(EncapsulatedWebTransportTest, WriteOnlyGarbageCollection) {
542 std::unique_ptr<EncapsulatedSession> session =
543 CreateTransport(Perspective::kClient);
544 DefaultHandshakeForClient(*session);
545
546 Stream* stream = session->OpenOutgoingUnidirectionalStream();
547 ASSERT_TRUE(stream != nullptr);
548
549 MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
550 bool deleted = false;
551 EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
552 EXPECT_CALL(*this, OnCapsule(_)).WillOnce(Return(true));
553
554 quiche::StreamWriteOptions options;
555 options.set_send_fin(true);
556 EXPECT_THAT(stream->Writev(absl::Span<const absl::string_view>(), options),
557 StatusIs(absl::StatusCode::kOk));
558 session->GarbageCollectStreams();
559 EXPECT_TRUE(deleted);
560 }
561
TEST_F(EncapsulatedWebTransportTest,SimpleWrite)562 TEST_F(EncapsulatedWebTransportTest, SimpleWrite) {
563 std::unique_ptr<EncapsulatedSession> session =
564 CreateTransport(Perspective::kClient);
565 DefaultHandshakeForClient(*session);
566 EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
567 ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "", true});
568 Stream* stream = session->AcceptIncomingBidirectionalStream();
569 ASSERT_TRUE(stream != nullptr);
570
571 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
572 EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
573 EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 1u);
574 EXPECT_EQ(capsule.web_transport_stream_data().fin, false);
575 EXPECT_EQ(capsule.web_transport_stream_data().data, "test");
576 return true;
577 });
578 absl::Status status = quiche::WriteIntoStream(*stream, "test");
579 EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
580 }
581
TEST_F(EncapsulatedWebTransportTest,WriteWithFin)582 TEST_F(EncapsulatedWebTransportTest, WriteWithFin) {
583 std::unique_ptr<EncapsulatedSession> session =
584 CreateTransport(Perspective::kClient);
585 DefaultHandshakeForClient(*session);
586 Stream* stream = session->OpenOutgoingUnidirectionalStream();
587 ASSERT_TRUE(stream != nullptr);
588
589 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
590 EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM_WITH_FIN);
591 EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
592 EXPECT_EQ(capsule.web_transport_stream_data().fin, true);
593 EXPECT_EQ(capsule.web_transport_stream_data().data, "test");
594 return true;
595 });
596 quiche::StreamWriteOptions options;
597 options.set_send_fin(true);
598 EXPECT_TRUE(stream->CanWrite());
599 absl::Status status = quiche::WriteIntoStream(*stream, "test", options);
600 EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
601 EXPECT_FALSE(stream->CanWrite());
602 }
603
TEST_F(EncapsulatedWebTransportTest,FinOnlyWrite)604 TEST_F(EncapsulatedWebTransportTest, FinOnlyWrite) {
605 std::unique_ptr<EncapsulatedSession> session =
606 CreateTransport(Perspective::kClient);
607 DefaultHandshakeForClient(*session);
608 Stream* stream = session->OpenOutgoingUnidirectionalStream();
609 ASSERT_TRUE(stream != nullptr);
610
611 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
612 EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM_WITH_FIN);
613 EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
614 EXPECT_EQ(capsule.web_transport_stream_data().fin, true);
615 EXPECT_EQ(capsule.web_transport_stream_data().data, "");
616 return true;
617 });
618 quiche::StreamWriteOptions options;
619 options.set_send_fin(true);
620 EXPECT_TRUE(stream->CanWrite());
621 absl::Status status =
622 stream->Writev(absl::Span<const absl::string_view>(), options);
623 EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
624 EXPECT_FALSE(stream->CanWrite());
625 }
626
TEST_F(EncapsulatedWebTransportTest,BufferedWriteThenUnbuffer)627 TEST_F(EncapsulatedWebTransportTest, BufferedWriteThenUnbuffer) {
628 std::unique_ptr<EncapsulatedSession> session =
629 CreateTransport(Perspective::kClient);
630 DefaultHandshakeForClient(*session);
631 Stream* stream = session->OpenOutgoingUnidirectionalStream();
632 ASSERT_TRUE(stream != nullptr);
633
634 EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
635 absl::Status status = quiche::WriteIntoStream(*stream, "abc");
636 EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
637
638 // While the stream cannot be written right now, we should be still able to
639 // buffer data into it.
640 EXPECT_TRUE(stream->CanWrite());
641 EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true));
642 status = quiche::WriteIntoStream(*stream, "def");
643 EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
644
645 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
646 EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
647 EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
648 EXPECT_EQ(capsule.web_transport_stream_data().data, "abcdef");
649 return true;
650 });
651 session_->OnCanWrite();
652 }
653
TEST_F(EncapsulatedWebTransportTest,BufferedWriteThenFlush)654 TEST_F(EncapsulatedWebTransportTest, BufferedWriteThenFlush) {
655 std::unique_ptr<EncapsulatedSession> session =
656 CreateTransport(Perspective::kClient);
657 DefaultHandshakeForClient(*session);
658 Stream* stream = session->OpenOutgoingUnidirectionalStream();
659 ASSERT_TRUE(stream != nullptr);
660
661 EXPECT_CALL(writer_, CanWrite()).Times(2).WillRepeatedly(Return(false));
662 absl::Status status = quiche::WriteIntoStream(*stream, "abc");
663 EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
664 status = quiche::WriteIntoStream(*stream, "def");
665 EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
666
667 EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true));
668 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
669 EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
670 EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
671 EXPECT_EQ(capsule.web_transport_stream_data().data, "abcdef");
672 return true;
673 });
674 session_->OnCanWrite();
675 }
676
TEST_F(EncapsulatedWebTransportTest,BufferedStreamBlocksAnother)677 TEST_F(EncapsulatedWebTransportTest, BufferedStreamBlocksAnother) {
678 std::unique_ptr<EncapsulatedSession> session =
679 CreateTransport(Perspective::kClient);
680 DefaultHandshakeForClient(*session);
681 Stream* stream1 = session->OpenOutgoingUnidirectionalStream();
682 Stream* stream2 = session->OpenOutgoingUnidirectionalStream();
683 ASSERT_TRUE(stream1 != nullptr);
684 ASSERT_TRUE(stream2 != nullptr);
685
686 EXPECT_CALL(*this, OnCapsule(_)).Times(0);
687 EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
688 absl::Status status = quiche::WriteIntoStream(*stream1, "abc");
689 EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
690 // ShouldYield will return false here, causing the write to get buffered.
691 EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true));
692 status = quiche::WriteIntoStream(*stream2, "abc");
693 EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
694
695 std::vector<StreamId> writes;
696 EXPECT_CALL(*this, OnCapsule(_)).WillRepeatedly([&](const Capsule& capsule) {
697 EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
698 writes.push_back(capsule.web_transport_stream_data().stream_id);
699 return true;
700 });
701 session_->OnCanWrite();
702 EXPECT_THAT(writes, ElementsAre(2, 6));
703 }
704
TEST_F(EncapsulatedWebTransportTest,SendReset)705 TEST_F(EncapsulatedWebTransportTest, SendReset) {
706 std::unique_ptr<EncapsulatedSession> session =
707 CreateTransport(Perspective::kClient);
708 DefaultHandshakeForClient(*session);
709 Stream* stream = session->OpenOutgoingUnidirectionalStream();
710 ASSERT_TRUE(stream != nullptr);
711
712 MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
713 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([&](const Capsule& capsule) {
714 EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_RESET_STREAM);
715 EXPECT_EQ(capsule.web_transport_reset_stream().stream_id, 2u);
716 EXPECT_EQ(capsule.web_transport_reset_stream().error_code, 1234u);
717 return true;
718 });
719 stream->ResetWithUserCode(1234u);
720
721 bool deleted = false;
722 EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
723 session->GarbageCollectStreams();
724 EXPECT_TRUE(deleted);
725 }
726
TEST_F(EncapsulatedWebTransportTest,ReceiveReset)727 TEST_F(EncapsulatedWebTransportTest, ReceiveReset) {
728 std::unique_ptr<EncapsulatedSession> session =
729 CreateTransport(Perspective::kClient);
730 DefaultHandshakeForClient(*session);
731 EXPECT_CALL(*visitor_, OnIncomingUnidirectionalStreamAvailable());
732 ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{3, "", true});
733 Stream* stream = session->AcceptIncomingUnidirectionalStream();
734 ASSERT_TRUE(stream != nullptr);
735
736 MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
737 EXPECT_CALL(*visitor, OnResetStreamReceived(1234u));
738 EXPECT_TRUE(session->GetStreamById(3) != nullptr);
739 ProcessIncomingCapsule(quiche::WebTransportResetStreamCapsule{3u, 1234u});
740 // Reading from the underlying transport automatically triggers garbage
741 // collection.
742 EXPECT_TRUE(session->GetStreamById(3) == nullptr);
743 }
744
TEST_F(EncapsulatedWebTransportTest,SendStopSending)745 TEST_F(EncapsulatedWebTransportTest, SendStopSending) {
746 std::unique_ptr<EncapsulatedSession> session =
747 CreateTransport(Perspective::kClient);
748 DefaultHandshakeForClient(*session);
749 EXPECT_CALL(*visitor_, OnIncomingUnidirectionalStreamAvailable());
750 ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{3, "", true});
751 Stream* stream = session->AcceptIncomingUnidirectionalStream();
752 ASSERT_TRUE(stream != nullptr);
753
754 MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
755 EXPECT_CALL(*this, OnCapsule(_)).WillOnce([&](const Capsule& capsule) {
756 EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STOP_SENDING);
757 EXPECT_EQ(capsule.web_transport_stop_sending().stream_id, 3u);
758 EXPECT_EQ(capsule.web_transport_stop_sending().error_code, 1234u);
759 return true;
760 });
761 stream->SendStopSending(1234u);
762
763 bool deleted = false;
764 EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
765 session->GarbageCollectStreams();
766 EXPECT_TRUE(deleted);
767 }
768
TEST_F(EncapsulatedWebTransportTest,ReceiveStopSending)769 TEST_F(EncapsulatedWebTransportTest, ReceiveStopSending) {
770 std::unique_ptr<EncapsulatedSession> session =
771 CreateTransport(Perspective::kClient);
772 DefaultHandshakeForClient(*session);
773 Stream* stream = session->OpenOutgoingUnidirectionalStream();
774 ASSERT_TRUE(stream != nullptr);
775
776 MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
777 EXPECT_CALL(*visitor, OnStopSendingReceived(1234u));
778 EXPECT_TRUE(session->GetStreamById(2) != nullptr);
779 ProcessIncomingCapsule(quiche::WebTransportStopSendingCapsule{2u, 1234u});
780 // Reading from the underlying transport automatically triggers garbage
781 // collection.
782 EXPECT_TRUE(session->GetStreamById(2) == nullptr);
783 }
784
785 } // namespace
786 } // namespace webtransport::test
787