1 // Copyright (c) 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/web_transport/encapsulated/encapsulated_web_transport.h"
6 
7 #include <stdbool.h>
8 
9 #include <algorithm>
10 #include <array>
11 #include <cstddef>
12 #include <cstdint>
13 #include <cstring>
14 #include <iterator>
15 #include <memory>
16 #include <optional>
17 #include <string>
18 #include <tuple>
19 #include <utility>
20 #include <vector>
21 
22 #include "absl/algorithm/container.h"
23 #include "absl/container/node_hash_map.h"
24 #include "absl/status/status.h"
25 #include "absl/status/statusor.h"
26 #include "absl/strings/str_cat.h"
27 #include "absl/strings/string_view.h"
28 #include "absl/time/time.h"
29 #include "absl/types/span.h"
30 #include "quiche/common/capsule.h"
31 #include "quiche/common/http/http_header_block.h"
32 #include "quiche/common/platform/api/quiche_bug_tracker.h"
33 #include "quiche/common/platform/api/quiche_logging.h"
34 #include "quiche/common/quiche_buffer_allocator.h"
35 #include "quiche/common/quiche_callbacks.h"
36 #include "quiche/common/quiche_circular_deque.h"
37 #include "quiche/common/quiche_status_utils.h"
38 #include "quiche/common/quiche_stream.h"
39 #include "quiche/web_transport/web_transport.h"
40 
41 namespace webtransport {
42 
43 namespace {
44 
45 using ::quiche::Capsule;
46 using ::quiche::CapsuleType;
47 using ::quiche::CloseWebTransportSessionCapsule;
48 
49 // This is arbitrary, since we don't have any real MTU restriction when running
50 // over TCP.
51 constexpr uint64_t kEncapsulatedMaxDatagramSize = 9000;
52 
53 constexpr StreamPriority kDefaultPriority = StreamPriority{0, 0};
54 
55 }  // namespace
56 
EncapsulatedSession(Perspective perspective,FatalErrorCallback fatal_error_callback)57 EncapsulatedSession::EncapsulatedSession(
58     Perspective perspective, FatalErrorCallback fatal_error_callback)
59     : perspective_(perspective),
60       fatal_error_callback_(std::move(fatal_error_callback)),
61       capsule_parser_(this),
62       next_outgoing_bidi_stream_(perspective == Perspective::kClient ? 0 : 1),
63       next_outgoing_unidi_stream_(perspective == Perspective::kClient ? 2 : 3) {
64   QUICHE_DCHECK(IsIdOpenedBy(next_outgoing_bidi_stream_, perspective));
65   QUICHE_DCHECK(IsIdOpenedBy(next_outgoing_unidi_stream_, perspective));
66 }
67 
InitializeClient(std::unique_ptr<SessionVisitor> visitor,quiche::HttpHeaderBlock &,quiche::WriteStream * writer,quiche::ReadStream * reader)68 void EncapsulatedSession::InitializeClient(
69     std::unique_ptr<SessionVisitor> visitor,
70     quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
71     quiche::ReadStream* reader) {
72   if (state_ != kUninitialized) {
73     OnFatalError("Called InitializeClient() in an invalid state");
74     return;
75   }
76   if (perspective_ != Perspective::kClient) {
77     OnFatalError("Called InitializeClient() on a server session");
78     return;
79   }
80 
81   visitor_ = std::move(visitor);
82   writer_ = writer;
83   reader_ = reader;
84   state_ = kWaitingForHeaders;
85 }
86 
InitializeServer(std::unique_ptr<SessionVisitor> visitor,const quiche::HttpHeaderBlock &,quiche::HttpHeaderBlock &,quiche::WriteStream * writer,quiche::ReadStream * reader)87 void EncapsulatedSession::InitializeServer(
88     std::unique_ptr<SessionVisitor> visitor,
89     const quiche::HttpHeaderBlock& /*incoming_headers*/,
90     quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
91     quiche::ReadStream* reader) {
92   if (state_ != kUninitialized) {
93     OnFatalError("Called InitializeServer() in an invalid state");
94     return;
95   }
96   if (perspective_ != Perspective::kServer) {
97     OnFatalError("Called InitializeServer() on a client session");
98     return;
99   }
100 
101   visitor_ = std::move(visitor);
102   writer_ = writer;
103   reader_ = reader;
104   OpenSession();
105 }
ProcessIncomingServerHeaders(const quiche::HttpHeaderBlock &)106 void EncapsulatedSession::ProcessIncomingServerHeaders(
107     const quiche::HttpHeaderBlock& /*headers*/) {
108   if (state_ != kWaitingForHeaders) {
109     OnFatalError("Called ProcessIncomingServerHeaders() in an invalid state");
110     return;
111   }
112   OpenSession();
113 }
114 
CloseSession(SessionErrorCode error_code,absl::string_view error_message)115 void EncapsulatedSession::CloseSession(SessionErrorCode error_code,
116                                        absl::string_view error_message) {
117   switch (state_) {
118     case kUninitialized:
119     case kWaitingForHeaders:
120       OnFatalError(absl::StrCat(
121           "Attempted to close a session before it opened with error 0x",
122           absl::Hex(error_code), ": ", error_message));
123       return;
124     case kSessionClosing:
125     case kSessionClosed:
126       OnFatalError(absl::StrCat(
127           "Attempted to close a session that is already closed with error 0x",
128           absl::Hex(error_code), ": ", error_message));
129       return;
130     case kSessionOpen:
131       break;
132   }
133   state_ = kSessionClosing;
134   buffered_session_close_ =
135       BufferedClose{error_code, std::string(error_message)};
136   OnCanWrite();
137 }
138 
AcceptIncomingStream(quiche::QuicheCircularDeque<StreamId> & queue)139 Stream* EncapsulatedSession::AcceptIncomingStream(
140     quiche::QuicheCircularDeque<StreamId>& queue) {
141   while (!queue.empty()) {
142     StreamId id = queue.front();
143     queue.pop_front();
144     Stream* stream = GetStreamById(id);
145     if (stream == nullptr) {
146       // Stream got reset and garbage collected before the peer ever had a
147       // chance to look at it.
148       continue;
149     }
150     return stream;
151   }
152   return nullptr;
153 }
154 
AcceptIncomingBidirectionalStream()155 Stream* EncapsulatedSession::AcceptIncomingBidirectionalStream() {
156   return AcceptIncomingStream(incoming_bidirectional_streams_);
157 }
AcceptIncomingUnidirectionalStream()158 Stream* EncapsulatedSession::AcceptIncomingUnidirectionalStream() {
159   return AcceptIncomingStream(incoming_unidirectional_streams_);
160 }
CanOpenNextOutgoingBidirectionalStream()161 bool EncapsulatedSession::CanOpenNextOutgoingBidirectionalStream() {
162   // TODO: implement flow control.
163   return true;
164 }
CanOpenNextOutgoingUnidirectionalStream()165 bool EncapsulatedSession::CanOpenNextOutgoingUnidirectionalStream() {
166   // TODO: implement flow control.
167   return true;
168 }
OpenOutgoingStream(StreamId & counter)169 Stream* EncapsulatedSession::OpenOutgoingStream(StreamId& counter) {
170   StreamId stream_id = counter;
171   counter += 4;
172   auto [it, inserted] = streams_.emplace(
173       std::piecewise_construct, std::forward_as_tuple(stream_id),
174       std::forward_as_tuple(this, stream_id));
175   QUICHE_DCHECK(inserted);
176   return &it->second;
177 }
OpenOutgoingBidirectionalStream()178 Stream* EncapsulatedSession::OpenOutgoingBidirectionalStream() {
179   if (!CanOpenNextOutgoingBidirectionalStream()) {
180     return nullptr;
181   }
182   return OpenOutgoingStream(next_outgoing_bidi_stream_);
183 }
OpenOutgoingUnidirectionalStream()184 Stream* EncapsulatedSession::OpenOutgoingUnidirectionalStream() {
185   if (!CanOpenNextOutgoingUnidirectionalStream()) {
186     return nullptr;
187   }
188   return OpenOutgoingStream(next_outgoing_unidi_stream_);
189 }
190 
GetStreamById(StreamId id)191 Stream* EncapsulatedSession::GetStreamById(StreamId id) {
192   auto it = streams_.find(id);
193   if (it == streams_.end()) {
194     return nullptr;
195   }
196   return &it->second;
197 }
198 
GetDatagramStats()199 DatagramStats EncapsulatedSession::GetDatagramStats() {
200   DatagramStats stats;
201   stats.expired_outgoing = 0;
202   stats.lost_outgoing = 0;
203   return stats;
204 }
205 
GetSessionStats()206 SessionStats EncapsulatedSession::GetSessionStats() {
207   // We could potentially get stats via tcp_info and similar mechanisms, but
208   // that would require us knowing what the underlying socket is.
209   return SessionStats();
210 }
211 
NotifySessionDraining()212 void EncapsulatedSession::NotifySessionDraining() {
213   SendControlCapsule(quiche::DrainWebTransportSessionCapsule());
214   OnCanWrite();
215 }
SetOnDraining(quiche::SingleUseCallback<void ()> callback)216 void EncapsulatedSession::SetOnDraining(
217     quiche::SingleUseCallback<void()> callback) {
218   draining_callback_ = std::move(callback);
219 }
220 
SendOrQueueDatagram(absl::string_view datagram)221 DatagramStatus EncapsulatedSession::SendOrQueueDatagram(
222     absl::string_view datagram) {
223   if (datagram.size() > GetMaxDatagramSize()) {
224     return DatagramStatus{
225         DatagramStatusCode::kTooBig,
226         absl::StrCat("Datagram is ", datagram.size(),
227                      " bytes long, while the specified maximum size is ",
228                      GetMaxDatagramSize())};
229   }
230 
231   bool write_blocked;
232   switch (state_) {
233     case kUninitialized:
234       write_blocked = true;
235       break;
236     // We can send datagrams before receiving any headers from the peer, since
237     // datagrams are not subject to queueing.
238     case kWaitingForHeaders:
239     case kSessionOpen:
240       write_blocked = !writer_->CanWrite();
241       break;
242     case kSessionClosing:
243     case kSessionClosed:
244       return DatagramStatus{DatagramStatusCode::kInternalError,
245                             "Writing into an already closed session"};
246   }
247 
248   if (write_blocked) {
249     // TODO: this *may* be useful to split into a separate queue.
250     control_capsule_queue_.push_back(
251         quiche::SerializeCapsule(Capsule::Datagram(datagram), allocator_));
252     return DatagramStatus{DatagramStatusCode::kSuccess, ""};
253   }
254 
255   // We could always write via OnCanWrite() above, but the optimistic path below
256   // allows us to avoid a copy.
257   quiche::QuicheBuffer buffer =
258       quiche::SerializeDatagramCapsuleHeader(datagram.size(), allocator_);
259   std::array spans = {buffer.AsStringView(), datagram};
260   absl::Status write_status =
261       writer_->Writev(absl::MakeConstSpan(spans), quiche::StreamWriteOptions());
262   if (!write_status.ok()) {
263     OnWriteError(write_status);
264     return DatagramStatus{
265         DatagramStatusCode::kInternalError,
266         absl::StrCat("Write error for datagram: ", write_status.ToString())};
267   }
268   return DatagramStatus{DatagramStatusCode::kSuccess, ""};
269 }
270 
GetMaxDatagramSize() const271 uint64_t EncapsulatedSession::GetMaxDatagramSize() const {
272   return kEncapsulatedMaxDatagramSize;
273 }
274 
SetDatagramMaxTimeInQueue(absl::Duration)275 void EncapsulatedSession::SetDatagramMaxTimeInQueue(
276     absl::Duration /*max_time_in_queue*/) {
277   // TODO(b/264263113): implement this (requires having a mockable clock).
278 }
279 
OnCanWrite()280 void EncapsulatedSession::OnCanWrite() {
281   if (state_ == kUninitialized || !writer_) {
282     OnFatalError("Trying to write before the session is initialized");
283     return;
284   }
285   if (state_ == kSessionClosed) {
286     OnFatalError("Trying to write before the session is closed");
287     return;
288   }
289 
290   if (state_ == kSessionClosing) {
291     if (writer_->CanWrite()) {
292       CloseWebTransportSessionCapsule capsule{
293           buffered_session_close_.error_code,
294           buffered_session_close_.error_message};
295       quiche::QuicheBuffer buffer =
296           quiche::SerializeCapsule(Capsule(std::move(capsule)), allocator_);
297       absl::Status write_status = SendFin(buffer.AsStringView());
298       if (!write_status.ok()) {
299         OnWriteError(quiche::AppendToStatus(write_status,
300                                             " while writing WT_CLOSE_SESSION"));
301         return;
302       }
303       OnSessionClosed(buffered_session_close_.error_code,
304                       buffered_session_close_.error_message);
305     }
306     return;
307   }
308 
309   while (writer_->CanWrite() && !control_capsule_queue_.empty()) {
310     absl::Status write_status = quiche::WriteIntoStream(
311         *writer_, control_capsule_queue_.front().AsStringView());
312     if (!write_status.ok()) {
313       OnWriteError(write_status);
314       return;
315     }
316     control_capsule_queue_.pop_front();
317   }
318 
319   while (writer_->CanWrite()) {
320     absl::StatusOr<StreamId> next_id = scheduler_.PopFront();
321     if (!next_id.ok()) {
322       QUICHE_DCHECK_EQ(next_id.status().code(), absl::StatusCode::kNotFound);
323       return;
324     }
325     auto it = streams_.find(*next_id);
326     if (it == streams_.end()) {
327       QUICHE_BUG(WT_H2_NextStreamNotInTheMap);
328       OnFatalError("Next scheduled stream is not in the map");
329       return;
330     }
331     QUICHE_DCHECK(it->second.HasPendingWrite());
332     it->second.FlushPendingWrite();
333   }
334 }
335 
OnCanRead()336 void EncapsulatedSession::OnCanRead() {
337   if (state_ == kSessionClosed || state_ == kSessionClosing) {
338     return;
339   }
340   bool has_fin = quiche::ProcessAllReadableRegions(
341       *reader_, [&](absl::string_view fragment) {
342         capsule_parser_.IngestCapsuleFragment(fragment);
343       });
344   if (has_fin) {
345     capsule_parser_.ErrorIfThereIsRemainingBufferedData();
346     OnSessionClosed(0, "");
347   }
348   if (state_ == kSessionOpen) {
349     GarbageCollectStreams();
350   }
351 }
352 
OnCapsule(const quiche::Capsule & capsule)353 bool EncapsulatedSession::OnCapsule(const quiche::Capsule& capsule) {
354   switch (capsule.capsule_type()) {
355     case CapsuleType::DATAGRAM:
356       visitor_->OnDatagramReceived(
357           capsule.datagram_capsule().http_datagram_payload);
358       break;
359     case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
360       if (draining_callback_) {
361         std::move(draining_callback_)();
362       }
363       break;
364     case CapsuleType::CLOSE_WEBTRANSPORT_SESSION:
365       OnSessionClosed(
366           capsule.close_web_transport_session_capsule().error_code,
367           std::string(
368               capsule.close_web_transport_session_capsule().error_message));
369       break;
370     case CapsuleType::WT_STREAM:
371     case CapsuleType::WT_STREAM_WITH_FIN:
372       ProcessStreamCapsule(capsule,
373                            capsule.web_transport_stream_data().stream_id);
374       break;
375     case CapsuleType::WT_RESET_STREAM:
376       ProcessStreamCapsule(capsule,
377                            capsule.web_transport_reset_stream().stream_id);
378       break;
379     case CapsuleType::WT_STOP_SENDING:
380       ProcessStreamCapsule(capsule,
381                            capsule.web_transport_stop_sending().stream_id);
382       break;
383     default:
384       break;
385   }
386   return state_ != kSessionClosed;
387 }
388 
OnCapsuleParseFailure(absl::string_view error_message)389 void EncapsulatedSession::OnCapsuleParseFailure(
390     absl::string_view error_message) {
391   if (state_ == kSessionClosed) {
392     return;
393   }
394   OnFatalError(absl::StrCat("Stream parse error: ", error_message));
395 }
396 
ProcessStreamCapsule(const quiche::Capsule & capsule,StreamId stream_id)397 void EncapsulatedSession::ProcessStreamCapsule(const quiche::Capsule& capsule,
398                                                StreamId stream_id) {
399   bool new_stream_created = false;
400   auto it = streams_.find(stream_id);
401   if (it == streams_.end()) {
402     if (IsOutgoing(stream_id)) {
403       // Ignore this frame, as it is possible that it refers to an outgoing
404       // stream that has been closed.
405       return;
406     }
407     // TODO: check flow control here.
408     it = streams_.emplace_hint(it, std::piecewise_construct,
409                                std::forward_as_tuple(stream_id),
410                                std::forward_as_tuple(this, stream_id));
411     new_stream_created = true;
412   }
413   InnerStream& stream = it->second;
414   stream.ProcessCapsule(capsule);
415   if (new_stream_created) {
416     if (IsBidirectionalId(stream_id)) {
417       incoming_bidirectional_streams_.push_back(stream_id);
418       visitor_->OnIncomingBidirectionalStreamAvailable();
419     } else {
420       incoming_unidirectional_streams_.push_back(stream_id);
421       visitor_->OnIncomingUnidirectionalStreamAvailable();
422     }
423   }
424 }
425 
ProcessCapsule(const quiche::Capsule & capsule)426 void EncapsulatedSession::InnerStream::ProcessCapsule(
427     const quiche::Capsule& capsule) {
428   switch (capsule.capsule_type()) {
429     case CapsuleType::WT_STREAM:
430     case CapsuleType::WT_STREAM_WITH_FIN: {
431       if (fin_received_) {
432         session_->OnFatalError(
433             "Received stream data for a stream that has already received a "
434             "FIN");
435         return;
436       }
437       if (read_side_closed_) {
438         // It is possible that we sent STOP_SENDING but it has not been received
439         // yet. Ignore.
440         return;
441       }
442       fin_received_ = capsule.capsule_type() == CapsuleType::WT_STREAM_WITH_FIN;
443       const quiche::WebTransportStreamDataCapsule& data =
444           capsule.web_transport_stream_data();
445       if (!data.data.empty()) {
446         incoming_reads_.push_back(IncomingRead{data.data, std::string()});
447       }
448       // Fast path: if the visitor consumes all of the incoming reads, we don't
449       // need to copy data from the capsule parser.
450       if (visitor_ != nullptr) {
451         visitor_->OnCanRead();
452       }
453       // Slow path: copy all data that the visitor have not consumed.
454       for (IncomingRead& read : incoming_reads_) {
455         QUICHE_DCHECK(!read.data.empty());
456         if (read.storage.empty()) {
457           read.storage = std::string(read.data);
458           read.data = read.storage;
459         }
460       }
461       return;
462     }
463     case CapsuleType::WT_RESET_STREAM:
464       CloseReadSide(capsule.web_transport_reset_stream().error_code);
465       return;
466     case CapsuleType::WT_STOP_SENDING:
467       CloseWriteSide(capsule.web_transport_stop_sending().error_code);
468       return;
469     default:
470       QUICHE_BUG(WT_H2_ProcessStreamCapsule_Unknown)
471           << "Unexpected capsule dispatched to InnerStream: " << capsule;
472       session_->OnFatalError(
473           "Internal error: Unexpected capsule dispatched to InnerStream");
474       return;
475   }
476 }
477 
OpenSession()478 void EncapsulatedSession::OpenSession() {
479   state_ = kSessionOpen;
480   visitor_->OnSessionReady();
481   OnCanWrite();
482   OnCanRead();
483 }
484 
SendFin(absl::string_view data)485 absl::Status EncapsulatedSession::SendFin(absl::string_view data) {
486   QUICHE_DCHECK(!fin_sent_);
487   fin_sent_ = true;
488   quiche::StreamWriteOptions options;
489   options.set_send_fin(true);
490   return quiche::WriteIntoStream(*writer_, data, options);
491 }
492 
OnSessionClosed(SessionErrorCode error_code,const std::string & error_message)493 void EncapsulatedSession::OnSessionClosed(SessionErrorCode error_code,
494                                           const std::string& error_message) {
495   if (!fin_sent_) {
496     absl::Status status = SendFin("");
497     if (!status.ok()) {
498       OnWriteError(status);
499       return;
500     }
501   }
502 
503   if (session_close_notified_) {
504     QUICHE_DCHECK_EQ(state_, kSessionClosed);
505     return;
506   }
507   state_ = kSessionClosed;
508   session_close_notified_ = true;
509 
510   if (visitor_ != nullptr) {
511     visitor_->OnSessionClosed(error_code, error_message);
512   }
513 }
514 
OnFatalError(absl::string_view error_message)515 void EncapsulatedSession::OnFatalError(absl::string_view error_message) {
516   QUICHE_DLOG(ERROR) << "Fatal error in encapsulated WebTransport: "
517                      << error_message;
518   state_ = kSessionClosed;
519   if (fatal_error_callback_) {
520     std::move(fatal_error_callback_)(error_message);
521     fatal_error_callback_ = nullptr;
522   }
523 }
524 
OnWriteError(absl::Status error)525 void EncapsulatedSession::OnWriteError(absl::Status error) {
526   OnFatalError(absl::StrCat(
527       error, " while trying to write encapsulated WebTransport data"));
528 }
529 
InnerStream(EncapsulatedSession * session,StreamId id)530 EncapsulatedSession::InnerStream::InnerStream(EncapsulatedSession* session,
531                                               StreamId id)
532     : session_(session),
533       id_(id),
534       read_side_closed_(IsUnidirectionalId(id) &&
535                         IsIdOpenedBy(id, session->perspective_)),
536       write_side_closed_(IsUnidirectionalId(id) &&
537                          !IsIdOpenedBy(id, session->perspective_)) {
538   if (!write_side_closed_) {
539     absl::Status status = session_->scheduler_.Register(id_, kDefaultPriority);
540     if (!status.ok()) {
541       QUICHE_BUG(WT_H2_FailedToRegisterNewStream) << status;
542       session_->OnFatalError(
543           "Failed to register new stream with the scheduler");
544       return;
545     }
546   }
547 }
548 
Read(absl::Span<char> output)549 quiche::ReadStream::ReadResult EncapsulatedSession::InnerStream::Read(
550     absl::Span<char> output) {
551   const size_t total_size = output.size();
552   for (const IncomingRead& read : incoming_reads_) {
553     size_t size_to_read = std::min(read.size(), output.size());
554     if (size_to_read == 0) {
555       break;
556     }
557     memcpy(output.data(), read.data.data(), size_to_read);
558     output = output.subspan(size_to_read);
559   }
560   bool fin_consumed = SkipBytes(total_size);
561   return ReadResult{total_size, fin_consumed};
562 }
Read(std::string * output)563 quiche::ReadStream::ReadResult EncapsulatedSession::InnerStream::Read(
564     std::string* output) {
565   const size_t total_size = ReadableBytes();
566   const size_t initial_offset = output->size();
567   output->resize(initial_offset + total_size);
568   return Read(absl::Span<char>(&((*output)[initial_offset]), total_size));
569 }
ReadableBytes() const570 size_t EncapsulatedSession::InnerStream::ReadableBytes() const {
571   size_t total_size = 0;
572   for (const IncomingRead& read : incoming_reads_) {
573     total_size += read.size();
574   }
575   return total_size;
576 }
577 quiche::ReadStream::PeekResult
PeekNextReadableRegion() const578 EncapsulatedSession::InnerStream::PeekNextReadableRegion() const {
579   if (incoming_reads_.empty()) {
580     return PeekResult{absl::string_view(), fin_received_, fin_received_};
581   }
582   return PeekResult{incoming_reads_.front().data,
583                     fin_received_ && incoming_reads_.size() == 1,
584                     fin_received_};
585 }
586 
SkipBytes(size_t bytes)587 bool EncapsulatedSession::InnerStream::SkipBytes(size_t bytes) {
588   size_t remaining = bytes;
589   while (remaining > 0) {
590     if (incoming_reads_.empty()) {
591       QUICHE_BUG(WT_H2_SkipBytes_toomuch)
592           << "Requested to skip " << remaining
593           << " bytes that are not present in the read buffer.";
594       return false;
595     }
596     IncomingRead& current = incoming_reads_.front();
597     if (remaining < current.size()) {
598       current.data = current.data.substr(remaining);
599       return false;
600     }
601     remaining -= current.size();
602     incoming_reads_.pop_front();
603   }
604   if (incoming_reads_.empty() && fin_received_) {
605     fin_consumed_ = true;
606     CloseReadSide(std::nullopt);
607     return true;
608   }
609   return false;
610 }
611 
Writev(const absl::Span<const absl::string_view> data,const quiche::StreamWriteOptions & options)612 absl::Status EncapsulatedSession::InnerStream::Writev(
613     const absl::Span<const absl::string_view> data,
614     const quiche::StreamWriteOptions& options) {
615   if (write_side_closed_) {
616     return absl::FailedPreconditionError(
617         "Trying to write into an already-closed stream");
618   }
619   if (fin_buffered_) {
620     return absl::FailedPreconditionError("FIN already buffered");
621   }
622   if (!CanWrite()) {
623     return absl::FailedPreconditionError(
624         "Trying to write into a stream when CanWrite() = false");
625   }
626 
627   const absl::StatusOr<bool> should_yield =
628       session_->scheduler_.ShouldYield(id_);
629   if (!should_yield.ok()) {
630     QUICHE_BUG(WT_H2_Writev_NotRegistered) << should_yield.status();
631     session_->OnFatalError("Stream not registered with the scheduler");
632     return absl::InternalError("Stream not registered with the scheduler");
633   }
634   const bool write_blocked = !session_->writer_->CanWrite() || *should_yield ||
635                              !pending_write_.empty();
636   if (write_blocked) {
637     fin_buffered_ = options.send_fin();
638     for (absl::string_view chunk : data) {
639       absl::StrAppend(&pending_write_, chunk);
640     }
641     absl::Status status = session_->scheduler_.Schedule(id_);
642     if (!status.ok()) {
643       QUICHE_BUG(WT_H2_Writev_CantSchedule) << status;
644       session_->OnFatalError("Could not schedule a write-blocked stream");
645       return absl::InternalError("Could not schedule a write-blocked stream");
646     }
647     return absl::OkStatus();
648   }
649 
650   size_t bytes_written = WriteInner(data, options.send_fin());
651   // TODO: handle partial writes when flow control requires those.
652   QUICHE_DCHECK(bytes_written == 0 ||
653                 bytes_written == quiche::TotalStringViewSpanSize(data));
654   if (bytes_written == 0) {
655     for (absl::string_view chunk : data) {
656       absl::StrAppend(&pending_write_, chunk);
657     }
658   }
659 
660   if (options.send_fin()) {
661     CloseWriteSide(std::nullopt);
662   }
663   return absl::OkStatus();
664 }
665 
CanWrite() const666 bool EncapsulatedSession::InnerStream::CanWrite() const {
667   return session_->state_ != EncapsulatedSession::kSessionClosed &&
668          !write_side_closed_ &&
669          (pending_write_.size() <= session_->max_stream_data_buffered_);
670 }
671 
FlushPendingWrite()672 void EncapsulatedSession::InnerStream::FlushPendingWrite() {
673   QUICHE_DCHECK(!write_side_closed_);
674   QUICHE_DCHECK(session_->writer_->CanWrite());
675   QUICHE_DCHECK(!pending_write_.empty());
676   absl::string_view to_write = pending_write_;
677   size_t bytes_written =
678       WriteInner(absl::MakeSpan(&to_write, 1), fin_buffered_);
679   if (bytes_written < to_write.size()) {
680     pending_write_ = pending_write_.substr(bytes_written);
681     return;
682   }
683   pending_write_.clear();
684   if (fin_buffered_) {
685     CloseWriteSide(std::nullopt);
686   }
687   if (!write_side_closed_ && visitor_ != nullptr) {
688     visitor_->OnCanWrite();
689   }
690 }
691 
WriteInner(absl::Span<const absl::string_view> data,bool fin)692 size_t EncapsulatedSession::InnerStream::WriteInner(
693     absl::Span<const absl::string_view> data, bool fin) {
694   size_t total_size = quiche::TotalStringViewSpanSize(data);
695   if (total_size == 0 && !fin) {
696     session_->OnFatalError("Attempted to make an empty write with fin=false");
697     return 0;
698   }
699   quiche::QuicheBuffer header =
700       quiche::SerializeWebTransportStreamCapsuleHeader(id_, fin, total_size,
701                                                        session_->allocator_);
702   std::vector<absl::string_view> views_to_write;
703   views_to_write.reserve(data.size() + 1);
704   views_to_write.push_back(header.AsStringView());
705   absl::c_copy(data, std::back_inserter(views_to_write));
706   absl::Status write_status = session_->writer_->Writev(
707       views_to_write, quiche::kDefaultStreamWriteOptions);
708   if (!write_status.ok()) {
709     session_->OnWriteError(write_status);
710     return 0;
711   }
712   return total_size;
713 }
714 
AbruptlyTerminate(absl::Status error)715 void EncapsulatedSession::InnerStream::AbruptlyTerminate(absl::Status error) {
716   QUICHE_DLOG(INFO) << "Abruptly terminating the stream due to error: "
717                     << error;
718   ResetDueToInternalError();
719 }
720 
ResetWithUserCode(StreamErrorCode error)721 void EncapsulatedSession::InnerStream::ResetWithUserCode(
722     StreamErrorCode error) {
723   if (reset_frame_sent_) {
724     return;
725   }
726   reset_frame_sent_ = true;
727 
728   session_->SendControlCapsule(
729       quiche::WebTransportResetStreamCapsule{id_, error});
730   CloseWriteSide(std::nullopt);
731 }
732 
SendStopSending(StreamErrorCode error)733 void EncapsulatedSession::InnerStream::SendStopSending(StreamErrorCode error) {
734   if (stop_sending_sent_) {
735     return;
736   }
737   stop_sending_sent_ = true;
738 
739   session_->SendControlCapsule(
740       quiche::WebTransportStopSendingCapsule{id_, error});
741   CloseReadSide(std::nullopt);
742 }
743 
CloseReadSide(std::optional<StreamErrorCode> error)744 void EncapsulatedSession::InnerStream::CloseReadSide(
745     std::optional<StreamErrorCode> error) {
746   if (read_side_closed_) {
747     return;
748   }
749   read_side_closed_ = true;
750   incoming_reads_.clear();
751   if (error.has_value() && visitor_ != nullptr) {
752     visitor_->OnResetStreamReceived(*error);
753   }
754   if (CanBeGarbageCollected()) {
755     session_->streams_to_garbage_collect_.push_back(id_);
756   }
757 }
758 
CloseWriteSide(std::optional<StreamErrorCode> error)759 void EncapsulatedSession::InnerStream::CloseWriteSide(
760     std::optional<StreamErrorCode> error) {
761   if (write_side_closed_) {
762     return;
763   }
764   write_side_closed_ = true;
765   pending_write_.clear();
766   absl::Status status = session_->scheduler_.Unregister(id_);
767   if (!status.ok()) {
768     session_->OnFatalError("Failed to unregister closed stream");
769     return;
770   }
771   if (error.has_value() && visitor_ != nullptr) {
772     visitor_->OnStopSendingReceived(*error);
773   }
774   if (CanBeGarbageCollected()) {
775     session_->streams_to_garbage_collect_.push_back(id_);
776   }
777 }
778 
GarbageCollectStreams()779 void EncapsulatedSession::GarbageCollectStreams() {
780   for (StreamId id : streams_to_garbage_collect_) {
781     streams_.erase(id);
782   }
783   streams_to_garbage_collect_.clear();
784 }
785 
786 }  // namespace webtransport
787