1 // Copyright (c) 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/web_transport/encapsulated/encapsulated_web_transport.h"
6 
7 #include <array>
8 #include <memory>
9 #include <string>
10 #include <utility>
11 #include <vector>
12 
13 #include "absl/status/status.h"
14 #include "absl/strings/string_view.h"
15 #include "absl/types/span.h"
16 #include "quiche/common/capsule.h"
17 #include "quiche/common/http/http_header_block.h"
18 #include "quiche/common/platform/api/quiche_test.h"
19 #include "quiche/common/quiche_buffer_allocator.h"
20 #include "quiche/common/quiche_stream.h"
21 #include "quiche/common/simple_buffer_allocator.h"
22 #include "quiche/common/test_tools/mock_streams.h"
23 #include "quiche/common/test_tools/quiche_test_utils.h"
24 #include "quiche/web_transport/test_tools/mock_web_transport.h"
25 #include "quiche/web_transport/web_transport.h"
26 
27 namespace webtransport::test {
28 namespace {
29 
30 using ::quiche::Capsule;
31 using ::quiche::CapsuleType;
32 using ::quiche::test::StatusIs;
33 using ::testing::_;
34 using ::testing::ElementsAre;
35 using ::testing::HasSubstr;
36 using ::testing::IsEmpty;
37 using ::testing::Return;
38 using ::testing::StrEq;
39 
40 class EncapsulatedWebTransportTest : public quiche::test::QuicheTest,
41                                      public quiche::CapsuleParser::Visitor {
42  public:
EncapsulatedWebTransportTest()43   EncapsulatedWebTransportTest() : parser_(this), reader_(&read_buffer_) {
44     ON_CALL(fatal_error_callback_, Call(_))
45         .WillByDefault([](absl::string_view error) {
46           ADD_FAILURE() << "Fatal session error: " << error;
47         });
48     ON_CALL(writer_, Writev(_, _))
49         .WillByDefault([&](absl::Span<const absl::string_view> data,
50                            const quiche::StreamWriteOptions& options) {
51           for (absl::string_view fragment : data) {
52             parser_.IngestCapsuleFragment(fragment);
53           }
54           writer_.ProcessOptions(options);
55           return absl::OkStatus();
56         });
57   }
58 
CreateTransport(Perspective perspective)59   std::unique_ptr<EncapsulatedSession> CreateTransport(
60       Perspective perspective) {
61     auto transport = std::make_unique<EncapsulatedSession>(
62         perspective, fatal_error_callback_.AsStdFunction());
63     session_ = transport.get();
64     return transport;
65   }
66 
CreateAndStoreVisitor()67   std::unique_ptr<SessionVisitor> CreateAndStoreVisitor() {
68     auto visitor = std::make_unique<testing::StrictMock<MockSessionVisitor>>();
69     visitor_ = visitor.get();
70     return visitor;
71   }
72 
73   MOCK_METHOD(bool, OnCapsule, (const Capsule&), (override));
74 
OnCapsuleParseFailure(absl::string_view error_message)75   void OnCapsuleParseFailure(absl::string_view error_message) override {
76     ADD_FAILURE() << "Written an invalid capsule: " << error_message;
77   }
78 
ProcessIncomingCapsule(const Capsule & capsule)79   void ProcessIncomingCapsule(const Capsule& capsule) {
80     quiche::QuicheBuffer buffer =
81         quiche::SerializeCapsule(capsule, quiche::SimpleBufferAllocator::Get());
82     read_buffer_.append(buffer.data(), buffer.size());
83     session_->OnCanRead();
84   }
85 
86   template <typename CapsuleType>
ProcessIncomingCapsule(const CapsuleType & capsule)87   void ProcessIncomingCapsule(const CapsuleType& capsule) {
88     quiche::QuicheBuffer buffer = quiche::SerializeCapsule(
89         quiche::Capsule(capsule), quiche::SimpleBufferAllocator::Get());
90     read_buffer_.append(buffer.data(), buffer.size());
91     session_->OnCanRead();
92   }
93 
DefaultHandshakeForClient(EncapsulatedSession & session)94   void DefaultHandshakeForClient(EncapsulatedSession& session) {
95     quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
96     session.InitializeClient(CreateAndStoreVisitor(), outgoing_headers,
97                              &writer_, &reader_);
98     EXPECT_CALL(*visitor_, OnSessionReady());
99     session.ProcessIncomingServerHeaders(incoming_headers);
100   }
101 
102  protected:
103   quiche::CapsuleParser parser_;
104   quiche::test::MockWriteStream writer_;
105   std::string read_buffer_;
106   quiche::test::ReadStreamFromString reader_;
107   MockSessionVisitor* visitor_ = nullptr;
108   EncapsulatedSession* session_ = nullptr;
109   testing::MockFunction<void(absl::string_view)> fatal_error_callback_;
110 };
111 
TEST_F(EncapsulatedWebTransportTest,IsOpenedBy)112 TEST_F(EncapsulatedWebTransportTest, IsOpenedBy) {
113   EXPECT_EQ(IsIdOpenedBy(0x00, Perspective::kClient), true);
114   EXPECT_EQ(IsIdOpenedBy(0x01, Perspective::kClient), false);
115   EXPECT_EQ(IsIdOpenedBy(0x02, Perspective::kClient), true);
116   EXPECT_EQ(IsIdOpenedBy(0x03, Perspective::kClient), false);
117 
118   EXPECT_EQ(IsIdOpenedBy(0x00, Perspective::kServer), false);
119   EXPECT_EQ(IsIdOpenedBy(0x01, Perspective::kServer), true);
120   EXPECT_EQ(IsIdOpenedBy(0x02, Perspective::kServer), false);
121   EXPECT_EQ(IsIdOpenedBy(0x03, Perspective::kServer), true);
122 }
123 
TEST_F(EncapsulatedWebTransportTest,SetupClientSession)124 TEST_F(EncapsulatedWebTransportTest, SetupClientSession) {
125   std::unique_ptr<EncapsulatedSession> session =
126       CreateTransport(Perspective::kClient);
127   quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
128   EXPECT_EQ(session->state(), EncapsulatedSession::kUninitialized);
129   session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_,
130                             &reader_);
131   EXPECT_EQ(session->state(), EncapsulatedSession::kWaitingForHeaders);
132   EXPECT_CALL(*visitor_, OnSessionReady());
133   session->ProcessIncomingServerHeaders(incoming_headers);
134   EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
135 }
136 
TEST_F(EncapsulatedWebTransportTest,SetupServerSession)137 TEST_F(EncapsulatedWebTransportTest, SetupServerSession) {
138   std::unique_ptr<EncapsulatedSession> session =
139       CreateTransport(Perspective::kServer);
140   quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
141   EXPECT_EQ(session->state(), EncapsulatedSession::kUninitialized);
142   std::unique_ptr<SessionVisitor> visitor = CreateAndStoreVisitor();
143   EXPECT_CALL(*visitor_, OnSessionReady());
144   session->InitializeServer(std::move(visitor), outgoing_headers,
145                             incoming_headers, &writer_, &reader_);
146   EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
147 }
148 
TEST_F(EncapsulatedWebTransportTest,CloseSession)149 TEST_F(EncapsulatedWebTransportTest, CloseSession) {
150   std::unique_ptr<EncapsulatedSession> session =
151       CreateTransport(Perspective::kClient);
152   DefaultHandshakeForClient(*session);
153   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
154     EXPECT_EQ(capsule.capsule_type(), CapsuleType::CLOSE_WEBTRANSPORT_SESSION);
155     EXPECT_EQ(capsule.close_web_transport_session_capsule().error_code, 0x1234);
156     EXPECT_EQ(capsule.close_web_transport_session_capsule().error_message,
157               "test close");
158     return true;
159   });
160   EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
161   EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close")));
162   session->CloseSession(0x1234, "test close");
163   EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed);
164   EXPECT_TRUE(writer_.fin_written());
165 
166   EXPECT_CALL(fatal_error_callback_, Call(_))
167       .WillOnce([](absl::string_view error) {
168         EXPECT_THAT(error, HasSubstr("close a session that is already closed"));
169       });
170   session->CloseSession(0x1234, "test close");
171 }
172 
TEST_F(EncapsulatedWebTransportTest,CloseSessionWriteBlocked)173 TEST_F(EncapsulatedWebTransportTest, CloseSessionWriteBlocked) {
174   std::unique_ptr<EncapsulatedSession> session =
175       CreateTransport(Perspective::kClient);
176   DefaultHandshakeForClient(*session);
177   EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
178   EXPECT_CALL(*this, OnCapsule(_)).Times(0);
179   EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
180   session->CloseSession(0x1234, "test close");
181   EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosing);
182 
183   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
184     EXPECT_EQ(capsule.capsule_type(), CapsuleType::CLOSE_WEBTRANSPORT_SESSION);
185     EXPECT_EQ(capsule.close_web_transport_session_capsule().error_code, 0x1234);
186     EXPECT_EQ(capsule.close_web_transport_session_capsule().error_message,
187               "test close");
188     return true;
189   });
190   EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(true));
191   EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close")));
192   session->OnCanWrite();
193   EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed);
194   EXPECT_TRUE(writer_.fin_written());
195 }
196 
TEST_F(EncapsulatedWebTransportTest,ReceiveFin)197 TEST_F(EncapsulatedWebTransportTest, ReceiveFin) {
198   std::unique_ptr<EncapsulatedSession> session =
199       CreateTransport(Perspective::kClient);
200   DefaultHandshakeForClient(*session);
201 
202   EXPECT_CALL(*visitor_, OnSessionClosed(0, IsEmpty()));
203   reader_.set_fin();
204   session->OnCanRead();
205   EXPECT_TRUE(writer_.fin_written());
206 }
207 
TEST_F(EncapsulatedWebTransportTest,ReceiveCloseSession)208 TEST_F(EncapsulatedWebTransportTest, ReceiveCloseSession) {
209   std::unique_ptr<EncapsulatedSession> session =
210       CreateTransport(Perspective::kClient);
211   DefaultHandshakeForClient(*session);
212 
213   EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test")));
214   ProcessIncomingCapsule(Capsule::CloseWebTransportSession(0x1234, "test"));
215   EXPECT_TRUE(writer_.fin_written());
216   reader_.set_fin();
217   session->OnCanRead();
218 }
219 
TEST_F(EncapsulatedWebTransportTest,ReceiveMalformedData)220 TEST_F(EncapsulatedWebTransportTest, ReceiveMalformedData) {
221   std::unique_ptr<EncapsulatedSession> session =
222       CreateTransport(Perspective::kClient);
223   DefaultHandshakeForClient(*session);
224 
225   EXPECT_CALL(fatal_error_callback_, Call(HasSubstr("too much capsule data")))
226       .WillOnce([] {});
227   read_buffer_ = std::string(2 * 1024 * 1024, '\xff');
228   session->OnCanRead();
229 }
230 
TEST_F(EncapsulatedWebTransportTest,SendDatagrams)231 TEST_F(EncapsulatedWebTransportTest, SendDatagrams) {
232   std::unique_ptr<EncapsulatedSession> session =
233       CreateTransport(Perspective::kClient);
234   DefaultHandshakeForClient(*session);
235   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
236     EXPECT_EQ(capsule.capsule_type(), quiche::CapsuleType::DATAGRAM);
237     EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
238     return true;
239   });
240   DatagramStatus status = session->SendOrQueueDatagram("test");
241   EXPECT_EQ(status.code, DatagramStatusCode::kSuccess);
242 }
243 
TEST_F(EncapsulatedWebTransportTest,SendDatagramsEarly)244 TEST_F(EncapsulatedWebTransportTest, SendDatagramsEarly) {
245   std::unique_ptr<EncapsulatedSession> session =
246       CreateTransport(Perspective::kClient);
247   quiche::HttpHeaderBlock outgoing_headers;
248   session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_,
249                             &reader_);
250   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
251     EXPECT_EQ(capsule.capsule_type(), quiche::CapsuleType::DATAGRAM);
252     EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
253     return true;
254   });
255   ASSERT_EQ(session->state(), EncapsulatedSession::kWaitingForHeaders);
256   session->SendOrQueueDatagram("test");
257 }
258 
TEST_F(EncapsulatedWebTransportTest,SendDatagramsBeforeInitialization)259 TEST_F(EncapsulatedWebTransportTest, SendDatagramsBeforeInitialization) {
260   std::unique_ptr<EncapsulatedSession> session =
261       CreateTransport(Perspective::kClient);
262   quiche::HttpHeaderBlock outgoing_headers;
263   EXPECT_CALL(*this, OnCapsule(_)).Times(0);
264   ASSERT_EQ(session->state(), EncapsulatedSession::kUninitialized);
265   session->SendOrQueueDatagram("test");
266 
267   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
268     EXPECT_EQ(capsule.capsule_type(), CapsuleType::DATAGRAM);
269     EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
270     return true;
271   });
272   DefaultHandshakeForClient(*session);
273 }
274 
TEST_F(EncapsulatedWebTransportTest,SendDatagramsTooBig)275 TEST_F(EncapsulatedWebTransportTest, SendDatagramsTooBig) {
276   std::unique_ptr<EncapsulatedSession> session =
277       CreateTransport(Perspective::kClient);
278   DefaultHandshakeForClient(*session);
279   EXPECT_CALL(*this, OnCapsule(_)).Times(0);
280   std::string long_string(16 * 1024, 'a');
281   DatagramStatus status = session->SendOrQueueDatagram(long_string);
282   EXPECT_EQ(status.code, DatagramStatusCode::kTooBig);
283 }
284 
TEST_F(EncapsulatedWebTransportTest,ReceiveDatagrams)285 TEST_F(EncapsulatedWebTransportTest, ReceiveDatagrams) {
286   std::unique_ptr<EncapsulatedSession> session =
287       CreateTransport(Perspective::kClient);
288   DefaultHandshakeForClient(*session);
289   EXPECT_CALL(*visitor_, OnDatagramReceived(_))
290       .WillOnce([](absl::string_view data) { EXPECT_EQ(data, "test"); });
291   ProcessIncomingCapsule(Capsule::Datagram("test"));
292 }
293 
TEST_F(EncapsulatedWebTransportTest,SendDraining)294 TEST_F(EncapsulatedWebTransportTest, SendDraining) {
295   std::unique_ptr<EncapsulatedSession> session =
296       CreateTransport(Perspective::kClient);
297   DefaultHandshakeForClient(*session);
298   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
299     EXPECT_EQ(capsule.capsule_type(), CapsuleType::DRAIN_WEBTRANSPORT_SESSION);
300     return true;
301   });
302   session->NotifySessionDraining();
303 }
304 
TEST_F(EncapsulatedWebTransportTest,ReceiveDraining)305 TEST_F(EncapsulatedWebTransportTest, ReceiveDraining) {
306   std::unique_ptr<EncapsulatedSession> session =
307       CreateTransport(Perspective::kClient);
308   DefaultHandshakeForClient(*session);
309   testing::MockFunction<void()> callback;
310   session->SetOnDraining(callback.AsStdFunction());
311   EXPECT_CALL(callback, Call());
312   ProcessIncomingCapsule(Capsule(quiche::DrainWebTransportSessionCapsule()));
313 }
314 
TEST_F(EncapsulatedWebTransportTest,WriteErrorDatagram)315 TEST_F(EncapsulatedWebTransportTest, WriteErrorDatagram) {
316   std::unique_ptr<EncapsulatedSession> session =
317       CreateTransport(Perspective::kClient);
318   DefaultHandshakeForClient(*session);
319   EXPECT_CALL(writer_, Writev(_, _))
320       .WillOnce(Return(absl::InternalError("Test write error")));
321   EXPECT_CALL(fatal_error_callback_, Call(_))
322       .WillOnce([](absl::string_view error) {
323         EXPECT_THAT(error, HasSubstr("Test write error"));
324       });
325   DatagramStatus status = session->SendOrQueueDatagram("test");
326   EXPECT_EQ(status.code, DatagramStatusCode::kInternalError);
327 }
328 
TEST_F(EncapsulatedWebTransportTest,WriteErrorControlCapsule)329 TEST_F(EncapsulatedWebTransportTest, WriteErrorControlCapsule) {
330   std::unique_ptr<EncapsulatedSession> session =
331       CreateTransport(Perspective::kClient);
332   DefaultHandshakeForClient(*session);
333   EXPECT_CALL(writer_, Writev(_, _))
334       .WillOnce(Return(absl::InternalError("Test write error")));
335   EXPECT_CALL(fatal_error_callback_, Call(_))
336       .WillOnce([](absl::string_view error) {
337         EXPECT_THAT(error, HasSubstr("Test write error"));
338       });
339   session->NotifySessionDraining();
340 }
341 
TEST_F(EncapsulatedWebTransportTest,SimpleRead)342 TEST_F(EncapsulatedWebTransportTest, SimpleRead) {
343   std::unique_ptr<EncapsulatedSession> session =
344       CreateTransport(Perspective::kClient);
345   DefaultHandshakeForClient(*session);
346   bool stream_received = false;
347   EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable())
348       .WillOnce([&] { stream_received = true; });
349   std::string data = "test";
350   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, data, false});
351   // Make sure data gets copied.
352   data[0] = 'q';
353   EXPECT_TRUE(stream_received);
354   Stream* stream = session->AcceptIncomingBidirectionalStream();
355   ASSERT_TRUE(stream != nullptr);
356   EXPECT_EQ(stream->GetStreamId(), 1u);
357   EXPECT_EQ(stream->visitor(), nullptr);
358   EXPECT_EQ(stream->ReadableBytes(), 4u);
359 
360   quiche::ReadStream::PeekResult peek = stream->PeekNextReadableRegion();
361   EXPECT_EQ(peek.peeked_data, "test");
362   EXPECT_FALSE(peek.fin_next);
363   EXPECT_FALSE(peek.all_data_received);
364 
365   std::string buffer;
366   quiche::ReadStream::ReadResult read = stream->Read(&buffer);
367   EXPECT_EQ(read.bytes_read, 4);
368   EXPECT_FALSE(read.fin);
369   EXPECT_EQ(buffer, "test");
370   EXPECT_EQ(stream->ReadableBytes(), 0u);
371 }
372 
373 class MockStreamVisitorWithDestructor : public MockStreamVisitor {
374  public:
~MockStreamVisitorWithDestructor()375   ~MockStreamVisitorWithDestructor() { OnDelete(); }
376 
377   MOCK_METHOD(void, OnDelete, (), ());
378 };
379 
SetupVisitor(Stream & stream)380 MockStreamVisitorWithDestructor* SetupVisitor(Stream& stream) {
381   auto visitor = std::make_unique<MockStreamVisitorWithDestructor>();
382   MockStreamVisitorWithDestructor* result = visitor.get();
383   stream.SetVisitor(std::move(visitor));
384   return result;
385 }
386 
TEST_F(EncapsulatedWebTransportTest,ImmediateRead)387 TEST_F(EncapsulatedWebTransportTest, ImmediateRead) {
388   std::unique_ptr<EncapsulatedSession> session =
389       CreateTransport(Perspective::kClient);
390   DefaultHandshakeForClient(*session);
391   EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
392   ProcessIncomingCapsule(
393       quiche::WebTransportStreamDataCapsule{1, "abcd", false});
394   Stream* stream = session->AcceptIncomingBidirectionalStream();
395   ASSERT_TRUE(stream != nullptr);
396   EXPECT_EQ(stream->ReadableBytes(), 4u);
397 
398   MockStreamVisitor* visitor = SetupVisitor(*stream);
399   EXPECT_CALL(*visitor, OnCanRead()).WillOnce([&] {
400     std::string output;
401     (void)stream->Read(&output);
402     EXPECT_EQ(output, "abcdef");
403   });
404   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "ef", false});
405 }
406 
TEST_F(EncapsulatedWebTransportTest,FinPeek)407 TEST_F(EncapsulatedWebTransportTest, FinPeek) {
408   std::unique_ptr<EncapsulatedSession> session =
409       CreateTransport(Perspective::kClient);
410   DefaultHandshakeForClient(*session);
411   EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
412   ProcessIncomingCapsule(
413       quiche::WebTransportStreamDataCapsule{1, "abcd", false});
414   Stream* stream = session->AcceptIncomingBidirectionalStream();
415   ASSERT_TRUE(stream != nullptr);
416   EXPECT_EQ(stream->ReadableBytes(), 4u);
417 
418   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "ef", true});
419 
420   quiche::ReadStream::PeekResult peek = stream->PeekNextReadableRegion();
421   EXPECT_EQ(peek.peeked_data, "abcd");
422   EXPECT_FALSE(peek.fin_next);
423   EXPECT_TRUE(peek.all_data_received);
424 
425   EXPECT_FALSE(stream->SkipBytes(2));
426   peek = stream->PeekNextReadableRegion();
427   EXPECT_FALSE(peek.fin_next);
428   EXPECT_TRUE(peek.all_data_received);
429 
430   EXPECT_FALSE(stream->SkipBytes(2));
431   peek = stream->PeekNextReadableRegion();
432   EXPECT_EQ(peek.peeked_data, "ef");
433   EXPECT_TRUE(peek.fin_next);
434   EXPECT_TRUE(peek.all_data_received);
435 
436   EXPECT_TRUE(stream->SkipBytes(2));
437 }
438 
TEST_F(EncapsulatedWebTransportTest,FinRead)439 TEST_F(EncapsulatedWebTransportTest, FinRead) {
440   std::unique_ptr<EncapsulatedSession> session =
441       CreateTransport(Perspective::kClient);
442   DefaultHandshakeForClient(*session);
443   EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
444   ProcessIncomingCapsule(
445       quiche::WebTransportStreamDataCapsule{1, "abcdef", true});
446   Stream* stream = session->AcceptIncomingBidirectionalStream();
447   ASSERT_TRUE(stream != nullptr);
448   EXPECT_EQ(stream->ReadableBytes(), 6u);
449 
450   std::array<char, 3> buffer;
451   quiche::ReadStream::ReadResult read = stream->Read(absl::MakeSpan(buffer));
452   EXPECT_THAT(buffer, ElementsAre('a', 'b', 'c'));
453   EXPECT_EQ(read.bytes_read, 3);
454   EXPECT_FALSE(read.fin);
455 
456   read = stream->Read(absl::MakeSpan(buffer));
457   EXPECT_THAT(buffer, ElementsAre('d', 'e', 'f'));
458   EXPECT_EQ(read.bytes_read, 3);
459   EXPECT_TRUE(read.fin);
460 }
461 
TEST_F(EncapsulatedWebTransportTest,LargeRead)462 TEST_F(EncapsulatedWebTransportTest, LargeRead) {
463   std::unique_ptr<EncapsulatedSession> session =
464       CreateTransport(Perspective::kClient);
465   DefaultHandshakeForClient(*session);
466   EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
467   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{
468       1, std::string(64 * 1024, 'a'), true});
469   Stream* stream = session->AcceptIncomingBidirectionalStream();
470   ASSERT_TRUE(stream != nullptr);
471   EXPECT_EQ(stream->ReadableBytes(), 65536u);
472 
473   for (int i = 0; i < 64; i++) {
474     std::array<char, 1024> buffer;
475     quiche::ReadStream::ReadResult read = stream->Read(absl::MakeSpan(buffer));
476     EXPECT_EQ(read.bytes_read, 1024);
477     EXPECT_EQ(read.fin, i == 63);
478   }
479 }
480 
TEST_F(EncapsulatedWebTransportTest,DoubleFinReceived)481 TEST_F(EncapsulatedWebTransportTest, DoubleFinReceived) {
482   std::unique_ptr<EncapsulatedSession> session =
483       CreateTransport(Perspective::kClient);
484   DefaultHandshakeForClient(*session);
485   EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
486   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "abc", true});
487   Stream* stream = session->AcceptIncomingBidirectionalStream();
488   ASSERT_TRUE(stream != nullptr);
489 
490   EXPECT_CALL(fatal_error_callback_, Call(_))
491       .WillOnce([](absl::string_view error) {
492         EXPECT_THAT(error, HasSubstr("has already received a FIN"));
493       });
494   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "def", true});
495 }
496 
TEST_F(EncapsulatedWebTransportTest,CanWriteUnidiBidi)497 TEST_F(EncapsulatedWebTransportTest, CanWriteUnidiBidi) {
498   std::unique_ptr<EncapsulatedSession> session =
499       CreateTransport(Perspective::kClient);
500   DefaultHandshakeForClient(*session);
501   EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
502   EXPECT_CALL(*visitor_, OnIncomingUnidirectionalStreamAvailable());
503   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "abc", true});
504   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{3, "abc", true});
505 
506   Stream* stream = session->AcceptIncomingBidirectionalStream();
507   ASSERT_TRUE(stream != nullptr);
508   EXPECT_TRUE(stream->CanWrite());
509 
510   stream = session->AcceptIncomingUnidirectionalStream();
511   ASSERT_TRUE(stream != nullptr);
512   EXPECT_FALSE(stream->CanWrite());
513 
514   stream = session->OpenOutgoingBidirectionalStream();
515   ASSERT_TRUE(stream != nullptr);
516   EXPECT_TRUE(stream->CanWrite());
517 
518   stream = session->OpenOutgoingUnidirectionalStream();
519   ASSERT_TRUE(stream != nullptr);
520   EXPECT_TRUE(stream->CanWrite());
521 }
522 
TEST_F(EncapsulatedWebTransportTest,ReadOnlyGarbageCollection)523 TEST_F(EncapsulatedWebTransportTest, ReadOnlyGarbageCollection) {
524   std::unique_ptr<EncapsulatedSession> session =
525       CreateTransport(Perspective::kClient);
526   DefaultHandshakeForClient(*session);
527   EXPECT_CALL(*visitor_, OnIncomingUnidirectionalStreamAvailable());
528   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{3, "abc", true});
529 
530   Stream* stream = session->AcceptIncomingUnidirectionalStream();
531   ASSERT_TRUE(stream != nullptr);
532   EXPECT_TRUE(stream->SkipBytes(3));
533 
534   MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
535   bool deleted = false;
536   EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
537   session->GarbageCollectStreams();
538   EXPECT_TRUE(deleted);
539 }
540 
TEST_F(EncapsulatedWebTransportTest,WriteOnlyGarbageCollection)541 TEST_F(EncapsulatedWebTransportTest, WriteOnlyGarbageCollection) {
542   std::unique_ptr<EncapsulatedSession> session =
543       CreateTransport(Perspective::kClient);
544   DefaultHandshakeForClient(*session);
545 
546   Stream* stream = session->OpenOutgoingUnidirectionalStream();
547   ASSERT_TRUE(stream != nullptr);
548 
549   MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
550   bool deleted = false;
551   EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
552   EXPECT_CALL(*this, OnCapsule(_)).WillOnce(Return(true));
553 
554   quiche::StreamWriteOptions options;
555   options.set_send_fin(true);
556   EXPECT_THAT(stream->Writev(absl::Span<const absl::string_view>(), options),
557               StatusIs(absl::StatusCode::kOk));
558   session->GarbageCollectStreams();
559   EXPECT_TRUE(deleted);
560 }
561 
TEST_F(EncapsulatedWebTransportTest,SimpleWrite)562 TEST_F(EncapsulatedWebTransportTest, SimpleWrite) {
563   std::unique_ptr<EncapsulatedSession> session =
564       CreateTransport(Perspective::kClient);
565   DefaultHandshakeForClient(*session);
566   EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
567   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "", true});
568   Stream* stream = session->AcceptIncomingBidirectionalStream();
569   ASSERT_TRUE(stream != nullptr);
570 
571   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
572     EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
573     EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 1u);
574     EXPECT_EQ(capsule.web_transport_stream_data().fin, false);
575     EXPECT_EQ(capsule.web_transport_stream_data().data, "test");
576     return true;
577   });
578   absl::Status status = quiche::WriteIntoStream(*stream, "test");
579   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
580 }
581 
TEST_F(EncapsulatedWebTransportTest,WriteWithFin)582 TEST_F(EncapsulatedWebTransportTest, WriteWithFin) {
583   std::unique_ptr<EncapsulatedSession> session =
584       CreateTransport(Perspective::kClient);
585   DefaultHandshakeForClient(*session);
586   Stream* stream = session->OpenOutgoingUnidirectionalStream();
587   ASSERT_TRUE(stream != nullptr);
588 
589   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
590     EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM_WITH_FIN);
591     EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
592     EXPECT_EQ(capsule.web_transport_stream_data().fin, true);
593     EXPECT_EQ(capsule.web_transport_stream_data().data, "test");
594     return true;
595   });
596   quiche::StreamWriteOptions options;
597   options.set_send_fin(true);
598   EXPECT_TRUE(stream->CanWrite());
599   absl::Status status = quiche::WriteIntoStream(*stream, "test", options);
600   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
601   EXPECT_FALSE(stream->CanWrite());
602 }
603 
TEST_F(EncapsulatedWebTransportTest,FinOnlyWrite)604 TEST_F(EncapsulatedWebTransportTest, FinOnlyWrite) {
605   std::unique_ptr<EncapsulatedSession> session =
606       CreateTransport(Perspective::kClient);
607   DefaultHandshakeForClient(*session);
608   Stream* stream = session->OpenOutgoingUnidirectionalStream();
609   ASSERT_TRUE(stream != nullptr);
610 
611   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
612     EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM_WITH_FIN);
613     EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
614     EXPECT_EQ(capsule.web_transport_stream_data().fin, true);
615     EXPECT_EQ(capsule.web_transport_stream_data().data, "");
616     return true;
617   });
618   quiche::StreamWriteOptions options;
619   options.set_send_fin(true);
620   EXPECT_TRUE(stream->CanWrite());
621   absl::Status status =
622       stream->Writev(absl::Span<const absl::string_view>(), options);
623   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
624   EXPECT_FALSE(stream->CanWrite());
625 }
626 
TEST_F(EncapsulatedWebTransportTest,BufferedWriteThenUnbuffer)627 TEST_F(EncapsulatedWebTransportTest, BufferedWriteThenUnbuffer) {
628   std::unique_ptr<EncapsulatedSession> session =
629       CreateTransport(Perspective::kClient);
630   DefaultHandshakeForClient(*session);
631   Stream* stream = session->OpenOutgoingUnidirectionalStream();
632   ASSERT_TRUE(stream != nullptr);
633 
634   EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
635   absl::Status status = quiche::WriteIntoStream(*stream, "abc");
636   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
637 
638   // While the stream cannot be written right now, we should be still able to
639   // buffer data into it.
640   EXPECT_TRUE(stream->CanWrite());
641   EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true));
642   status = quiche::WriteIntoStream(*stream, "def");
643   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
644 
645   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
646     EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
647     EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
648     EXPECT_EQ(capsule.web_transport_stream_data().data, "abcdef");
649     return true;
650   });
651   session_->OnCanWrite();
652 }
653 
TEST_F(EncapsulatedWebTransportTest,BufferedWriteThenFlush)654 TEST_F(EncapsulatedWebTransportTest, BufferedWriteThenFlush) {
655   std::unique_ptr<EncapsulatedSession> session =
656       CreateTransport(Perspective::kClient);
657   DefaultHandshakeForClient(*session);
658   Stream* stream = session->OpenOutgoingUnidirectionalStream();
659   ASSERT_TRUE(stream != nullptr);
660 
661   EXPECT_CALL(writer_, CanWrite()).Times(2).WillRepeatedly(Return(false));
662   absl::Status status = quiche::WriteIntoStream(*stream, "abc");
663   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
664   status = quiche::WriteIntoStream(*stream, "def");
665   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
666 
667   EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true));
668   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
669     EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
670     EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
671     EXPECT_EQ(capsule.web_transport_stream_data().data, "abcdef");
672     return true;
673   });
674   session_->OnCanWrite();
675 }
676 
TEST_F(EncapsulatedWebTransportTest,BufferedStreamBlocksAnother)677 TEST_F(EncapsulatedWebTransportTest, BufferedStreamBlocksAnother) {
678   std::unique_ptr<EncapsulatedSession> session =
679       CreateTransport(Perspective::kClient);
680   DefaultHandshakeForClient(*session);
681   Stream* stream1 = session->OpenOutgoingUnidirectionalStream();
682   Stream* stream2 = session->OpenOutgoingUnidirectionalStream();
683   ASSERT_TRUE(stream1 != nullptr);
684   ASSERT_TRUE(stream2 != nullptr);
685 
686   EXPECT_CALL(*this, OnCapsule(_)).Times(0);
687   EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
688   absl::Status status = quiche::WriteIntoStream(*stream1, "abc");
689   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
690   // ShouldYield will return false here, causing the write to get buffered.
691   EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true));
692   status = quiche::WriteIntoStream(*stream2, "abc");
693   EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
694 
695   std::vector<StreamId> writes;
696   EXPECT_CALL(*this, OnCapsule(_)).WillRepeatedly([&](const Capsule& capsule) {
697     EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
698     writes.push_back(capsule.web_transport_stream_data().stream_id);
699     return true;
700   });
701   session_->OnCanWrite();
702   EXPECT_THAT(writes, ElementsAre(2, 6));
703 }
704 
TEST_F(EncapsulatedWebTransportTest,SendReset)705 TEST_F(EncapsulatedWebTransportTest, SendReset) {
706   std::unique_ptr<EncapsulatedSession> session =
707       CreateTransport(Perspective::kClient);
708   DefaultHandshakeForClient(*session);
709   Stream* stream = session->OpenOutgoingUnidirectionalStream();
710   ASSERT_TRUE(stream != nullptr);
711 
712   MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
713   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([&](const Capsule& capsule) {
714     EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_RESET_STREAM);
715     EXPECT_EQ(capsule.web_transport_reset_stream().stream_id, 2u);
716     EXPECT_EQ(capsule.web_transport_reset_stream().error_code, 1234u);
717     return true;
718   });
719   stream->ResetWithUserCode(1234u);
720 
721   bool deleted = false;
722   EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
723   session->GarbageCollectStreams();
724   EXPECT_TRUE(deleted);
725 }
726 
TEST_F(EncapsulatedWebTransportTest,ReceiveReset)727 TEST_F(EncapsulatedWebTransportTest, ReceiveReset) {
728   std::unique_ptr<EncapsulatedSession> session =
729       CreateTransport(Perspective::kClient);
730   DefaultHandshakeForClient(*session);
731   EXPECT_CALL(*visitor_, OnIncomingUnidirectionalStreamAvailable());
732   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{3, "", true});
733   Stream* stream = session->AcceptIncomingUnidirectionalStream();
734   ASSERT_TRUE(stream != nullptr);
735 
736   MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
737   EXPECT_CALL(*visitor, OnResetStreamReceived(1234u));
738   EXPECT_TRUE(session->GetStreamById(3) != nullptr);
739   ProcessIncomingCapsule(quiche::WebTransportResetStreamCapsule{3u, 1234u});
740   // Reading from the underlying transport automatically triggers garbage
741   // collection.
742   EXPECT_TRUE(session->GetStreamById(3) == nullptr);
743 }
744 
TEST_F(EncapsulatedWebTransportTest,SendStopSending)745 TEST_F(EncapsulatedWebTransportTest, SendStopSending) {
746   std::unique_ptr<EncapsulatedSession> session =
747       CreateTransport(Perspective::kClient);
748   DefaultHandshakeForClient(*session);
749   EXPECT_CALL(*visitor_, OnIncomingUnidirectionalStreamAvailable());
750   ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{3, "", true});
751   Stream* stream = session->AcceptIncomingUnidirectionalStream();
752   ASSERT_TRUE(stream != nullptr);
753 
754   MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
755   EXPECT_CALL(*this, OnCapsule(_)).WillOnce([&](const Capsule& capsule) {
756     EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STOP_SENDING);
757     EXPECT_EQ(capsule.web_transport_stop_sending().stream_id, 3u);
758     EXPECT_EQ(capsule.web_transport_stop_sending().error_code, 1234u);
759     return true;
760   });
761   stream->SendStopSending(1234u);
762 
763   bool deleted = false;
764   EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
765   session->GarbageCollectStreams();
766   EXPECT_TRUE(deleted);
767 }
768 
TEST_F(EncapsulatedWebTransportTest,ReceiveStopSending)769 TEST_F(EncapsulatedWebTransportTest, ReceiveStopSending) {
770   std::unique_ptr<EncapsulatedSession> session =
771       CreateTransport(Perspective::kClient);
772   DefaultHandshakeForClient(*session);
773   Stream* stream = session->OpenOutgoingUnidirectionalStream();
774   ASSERT_TRUE(stream != nullptr);
775 
776   MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
777   EXPECT_CALL(*visitor, OnStopSendingReceived(1234u));
778   EXPECT_TRUE(session->GetStreamById(2) != nullptr);
779   ProcessIncomingCapsule(quiche::WebTransportStopSendingCapsule{2u, 1234u});
780   // Reading from the underlying transport automatically triggers garbage
781   // collection.
782   EXPECT_TRUE(session->GetStreamById(2) == nullptr);
783 }
784 
785 }  // namespace
786 }  // namespace webtransport::test
787