1 // Copyright (c) 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/web_transport/encapsulated/encapsulated_web_transport.h"
6
7 #include <stdbool.h>
8
9 #include <algorithm>
10 #include <array>
11 #include <cstddef>
12 #include <cstdint>
13 #include <cstring>
14 #include <iterator>
15 #include <memory>
16 #include <optional>
17 #include <string>
18 #include <tuple>
19 #include <utility>
20 #include <vector>
21
22 #include "absl/algorithm/container.h"
23 #include "absl/container/node_hash_map.h"
24 #include "absl/status/status.h"
25 #include "absl/status/statusor.h"
26 #include "absl/strings/str_cat.h"
27 #include "absl/strings/string_view.h"
28 #include "absl/time/time.h"
29 #include "absl/types/span.h"
30 #include "quiche/common/capsule.h"
31 #include "quiche/common/http/http_header_block.h"
32 #include "quiche/common/platform/api/quiche_bug_tracker.h"
33 #include "quiche/common/platform/api/quiche_logging.h"
34 #include "quiche/common/quiche_buffer_allocator.h"
35 #include "quiche/common/quiche_callbacks.h"
36 #include "quiche/common/quiche_circular_deque.h"
37 #include "quiche/common/quiche_status_utils.h"
38 #include "quiche/common/quiche_stream.h"
39 #include "quiche/web_transport/web_transport.h"
40
41 namespace webtransport {
42
43 namespace {
44
45 using ::quiche::Capsule;
46 using ::quiche::CapsuleType;
47 using ::quiche::CloseWebTransportSessionCapsule;
48
49 // This is arbitrary, since we don't have any real MTU restriction when running
50 // over TCP.
51 constexpr uint64_t kEncapsulatedMaxDatagramSize = 9000;
52
53 constexpr StreamPriority kDefaultPriority = StreamPriority{0, 0};
54
55 } // namespace
56
EncapsulatedSession(Perspective perspective,FatalErrorCallback fatal_error_callback)57 EncapsulatedSession::EncapsulatedSession(
58 Perspective perspective, FatalErrorCallback fatal_error_callback)
59 : perspective_(perspective),
60 fatal_error_callback_(std::move(fatal_error_callback)),
61 capsule_parser_(this),
62 next_outgoing_bidi_stream_(perspective == Perspective::kClient ? 0 : 1),
63 next_outgoing_unidi_stream_(perspective == Perspective::kClient ? 2 : 3) {
64 QUICHE_DCHECK(IsIdOpenedBy(next_outgoing_bidi_stream_, perspective));
65 QUICHE_DCHECK(IsIdOpenedBy(next_outgoing_unidi_stream_, perspective));
66 }
67
InitializeClient(std::unique_ptr<SessionVisitor> visitor,quiche::HttpHeaderBlock &,quiche::WriteStream * writer,quiche::ReadStream * reader)68 void EncapsulatedSession::InitializeClient(
69 std::unique_ptr<SessionVisitor> visitor,
70 quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
71 quiche::ReadStream* reader) {
72 if (state_ != kUninitialized) {
73 OnFatalError("Called InitializeClient() in an invalid state");
74 return;
75 }
76 if (perspective_ != Perspective::kClient) {
77 OnFatalError("Called InitializeClient() on a server session");
78 return;
79 }
80
81 visitor_ = std::move(visitor);
82 writer_ = writer;
83 reader_ = reader;
84 state_ = kWaitingForHeaders;
85 }
86
InitializeServer(std::unique_ptr<SessionVisitor> visitor,const quiche::HttpHeaderBlock &,quiche::HttpHeaderBlock &,quiche::WriteStream * writer,quiche::ReadStream * reader)87 void EncapsulatedSession::InitializeServer(
88 std::unique_ptr<SessionVisitor> visitor,
89 const quiche::HttpHeaderBlock& /*incoming_headers*/,
90 quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
91 quiche::ReadStream* reader) {
92 if (state_ != kUninitialized) {
93 OnFatalError("Called InitializeServer() in an invalid state");
94 return;
95 }
96 if (perspective_ != Perspective::kServer) {
97 OnFatalError("Called InitializeServer() on a client session");
98 return;
99 }
100
101 visitor_ = std::move(visitor);
102 writer_ = writer;
103 reader_ = reader;
104 OpenSession();
105 }
ProcessIncomingServerHeaders(const quiche::HttpHeaderBlock &)106 void EncapsulatedSession::ProcessIncomingServerHeaders(
107 const quiche::HttpHeaderBlock& /*headers*/) {
108 if (state_ != kWaitingForHeaders) {
109 OnFatalError("Called ProcessIncomingServerHeaders() in an invalid state");
110 return;
111 }
112 OpenSession();
113 }
114
CloseSession(SessionErrorCode error_code,absl::string_view error_message)115 void EncapsulatedSession::CloseSession(SessionErrorCode error_code,
116 absl::string_view error_message) {
117 switch (state_) {
118 case kUninitialized:
119 case kWaitingForHeaders:
120 OnFatalError(absl::StrCat(
121 "Attempted to close a session before it opened with error 0x",
122 absl::Hex(error_code), ": ", error_message));
123 return;
124 case kSessionClosing:
125 case kSessionClosed:
126 OnFatalError(absl::StrCat(
127 "Attempted to close a session that is already closed with error 0x",
128 absl::Hex(error_code), ": ", error_message));
129 return;
130 case kSessionOpen:
131 break;
132 }
133 state_ = kSessionClosing;
134 buffered_session_close_ =
135 BufferedClose{error_code, std::string(error_message)};
136 OnCanWrite();
137 }
138
AcceptIncomingStream(quiche::QuicheCircularDeque<StreamId> & queue)139 Stream* EncapsulatedSession::AcceptIncomingStream(
140 quiche::QuicheCircularDeque<StreamId>& queue) {
141 while (!queue.empty()) {
142 StreamId id = queue.front();
143 queue.pop_front();
144 Stream* stream = GetStreamById(id);
145 if (stream == nullptr) {
146 // Stream got reset and garbage collected before the peer ever had a
147 // chance to look at it.
148 continue;
149 }
150 return stream;
151 }
152 return nullptr;
153 }
154
AcceptIncomingBidirectionalStream()155 Stream* EncapsulatedSession::AcceptIncomingBidirectionalStream() {
156 return AcceptIncomingStream(incoming_bidirectional_streams_);
157 }
AcceptIncomingUnidirectionalStream()158 Stream* EncapsulatedSession::AcceptIncomingUnidirectionalStream() {
159 return AcceptIncomingStream(incoming_unidirectional_streams_);
160 }
CanOpenNextOutgoingBidirectionalStream()161 bool EncapsulatedSession::CanOpenNextOutgoingBidirectionalStream() {
162 // TODO: implement flow control.
163 return true;
164 }
CanOpenNextOutgoingUnidirectionalStream()165 bool EncapsulatedSession::CanOpenNextOutgoingUnidirectionalStream() {
166 // TODO: implement flow control.
167 return true;
168 }
OpenOutgoingStream(StreamId & counter)169 Stream* EncapsulatedSession::OpenOutgoingStream(StreamId& counter) {
170 StreamId stream_id = counter;
171 counter += 4;
172 auto [it, inserted] = streams_.emplace(
173 std::piecewise_construct, std::forward_as_tuple(stream_id),
174 std::forward_as_tuple(this, stream_id));
175 QUICHE_DCHECK(inserted);
176 return &it->second;
177 }
OpenOutgoingBidirectionalStream()178 Stream* EncapsulatedSession::OpenOutgoingBidirectionalStream() {
179 if (!CanOpenNextOutgoingBidirectionalStream()) {
180 return nullptr;
181 }
182 return OpenOutgoingStream(next_outgoing_bidi_stream_);
183 }
OpenOutgoingUnidirectionalStream()184 Stream* EncapsulatedSession::OpenOutgoingUnidirectionalStream() {
185 if (!CanOpenNextOutgoingUnidirectionalStream()) {
186 return nullptr;
187 }
188 return OpenOutgoingStream(next_outgoing_unidi_stream_);
189 }
190
GetStreamById(StreamId id)191 Stream* EncapsulatedSession::GetStreamById(StreamId id) {
192 auto it = streams_.find(id);
193 if (it == streams_.end()) {
194 return nullptr;
195 }
196 return &it->second;
197 }
198
GetDatagramStats()199 DatagramStats EncapsulatedSession::GetDatagramStats() {
200 DatagramStats stats;
201 stats.expired_outgoing = 0;
202 stats.lost_outgoing = 0;
203 return stats;
204 }
205
GetSessionStats()206 SessionStats EncapsulatedSession::GetSessionStats() {
207 // We could potentially get stats via tcp_info and similar mechanisms, but
208 // that would require us knowing what the underlying socket is.
209 return SessionStats();
210 }
211
NotifySessionDraining()212 void EncapsulatedSession::NotifySessionDraining() {
213 SendControlCapsule(quiche::DrainWebTransportSessionCapsule());
214 OnCanWrite();
215 }
SetOnDraining(quiche::SingleUseCallback<void ()> callback)216 void EncapsulatedSession::SetOnDraining(
217 quiche::SingleUseCallback<void()> callback) {
218 draining_callback_ = std::move(callback);
219 }
220
SendOrQueueDatagram(absl::string_view datagram)221 DatagramStatus EncapsulatedSession::SendOrQueueDatagram(
222 absl::string_view datagram) {
223 if (datagram.size() > GetMaxDatagramSize()) {
224 return DatagramStatus{
225 DatagramStatusCode::kTooBig,
226 absl::StrCat("Datagram is ", datagram.size(),
227 " bytes long, while the specified maximum size is ",
228 GetMaxDatagramSize())};
229 }
230
231 bool write_blocked;
232 switch (state_) {
233 case kUninitialized:
234 write_blocked = true;
235 break;
236 // We can send datagrams before receiving any headers from the peer, since
237 // datagrams are not subject to queueing.
238 case kWaitingForHeaders:
239 case kSessionOpen:
240 write_blocked = !writer_->CanWrite();
241 break;
242 case kSessionClosing:
243 case kSessionClosed:
244 return DatagramStatus{DatagramStatusCode::kInternalError,
245 "Writing into an already closed session"};
246 }
247
248 if (write_blocked) {
249 // TODO: this *may* be useful to split into a separate queue.
250 control_capsule_queue_.push_back(
251 quiche::SerializeCapsule(Capsule::Datagram(datagram), allocator_));
252 return DatagramStatus{DatagramStatusCode::kSuccess, ""};
253 }
254
255 // We could always write via OnCanWrite() above, but the optimistic path below
256 // allows us to avoid a copy.
257 quiche::QuicheBuffer buffer =
258 quiche::SerializeDatagramCapsuleHeader(datagram.size(), allocator_);
259 std::array spans = {buffer.AsStringView(), datagram};
260 absl::Status write_status =
261 writer_->Writev(absl::MakeConstSpan(spans), quiche::StreamWriteOptions());
262 if (!write_status.ok()) {
263 OnWriteError(write_status);
264 return DatagramStatus{
265 DatagramStatusCode::kInternalError,
266 absl::StrCat("Write error for datagram: ", write_status.ToString())};
267 }
268 return DatagramStatus{DatagramStatusCode::kSuccess, ""};
269 }
270
GetMaxDatagramSize() const271 uint64_t EncapsulatedSession::GetMaxDatagramSize() const {
272 return kEncapsulatedMaxDatagramSize;
273 }
274
SetDatagramMaxTimeInQueue(absl::Duration)275 void EncapsulatedSession::SetDatagramMaxTimeInQueue(
276 absl::Duration /*max_time_in_queue*/) {
277 // TODO(b/264263113): implement this (requires having a mockable clock).
278 }
279
OnCanWrite()280 void EncapsulatedSession::OnCanWrite() {
281 if (state_ == kUninitialized || !writer_) {
282 OnFatalError("Trying to write before the session is initialized");
283 return;
284 }
285 if (state_ == kSessionClosed) {
286 OnFatalError("Trying to write before the session is closed");
287 return;
288 }
289
290 if (state_ == kSessionClosing) {
291 if (writer_->CanWrite()) {
292 CloseWebTransportSessionCapsule capsule{
293 buffered_session_close_.error_code,
294 buffered_session_close_.error_message};
295 quiche::QuicheBuffer buffer =
296 quiche::SerializeCapsule(Capsule(std::move(capsule)), allocator_);
297 absl::Status write_status = SendFin(buffer.AsStringView());
298 if (!write_status.ok()) {
299 OnWriteError(quiche::AppendToStatus(write_status,
300 " while writing WT_CLOSE_SESSION"));
301 return;
302 }
303 OnSessionClosed(buffered_session_close_.error_code,
304 buffered_session_close_.error_message);
305 }
306 return;
307 }
308
309 while (writer_->CanWrite() && !control_capsule_queue_.empty()) {
310 absl::Status write_status = quiche::WriteIntoStream(
311 *writer_, control_capsule_queue_.front().AsStringView());
312 if (!write_status.ok()) {
313 OnWriteError(write_status);
314 return;
315 }
316 control_capsule_queue_.pop_front();
317 }
318
319 while (writer_->CanWrite()) {
320 absl::StatusOr<StreamId> next_id = scheduler_.PopFront();
321 if (!next_id.ok()) {
322 QUICHE_DCHECK_EQ(next_id.status().code(), absl::StatusCode::kNotFound);
323 return;
324 }
325 auto it = streams_.find(*next_id);
326 if (it == streams_.end()) {
327 QUICHE_BUG(WT_H2_NextStreamNotInTheMap);
328 OnFatalError("Next scheduled stream is not in the map");
329 return;
330 }
331 QUICHE_DCHECK(it->second.HasPendingWrite());
332 it->second.FlushPendingWrite();
333 }
334 }
335
OnCanRead()336 void EncapsulatedSession::OnCanRead() {
337 if (state_ == kSessionClosed || state_ == kSessionClosing) {
338 return;
339 }
340 bool has_fin = quiche::ProcessAllReadableRegions(
341 *reader_, [&](absl::string_view fragment) {
342 capsule_parser_.IngestCapsuleFragment(fragment);
343 });
344 if (has_fin) {
345 capsule_parser_.ErrorIfThereIsRemainingBufferedData();
346 OnSessionClosed(0, "");
347 }
348 if (state_ == kSessionOpen) {
349 GarbageCollectStreams();
350 }
351 }
352
OnCapsule(const quiche::Capsule & capsule)353 bool EncapsulatedSession::OnCapsule(const quiche::Capsule& capsule) {
354 switch (capsule.capsule_type()) {
355 case CapsuleType::DATAGRAM:
356 visitor_->OnDatagramReceived(
357 capsule.datagram_capsule().http_datagram_payload);
358 break;
359 case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
360 if (draining_callback_) {
361 std::move(draining_callback_)();
362 }
363 break;
364 case CapsuleType::CLOSE_WEBTRANSPORT_SESSION:
365 OnSessionClosed(
366 capsule.close_web_transport_session_capsule().error_code,
367 std::string(
368 capsule.close_web_transport_session_capsule().error_message));
369 break;
370 case CapsuleType::WT_STREAM:
371 case CapsuleType::WT_STREAM_WITH_FIN:
372 ProcessStreamCapsule(capsule,
373 capsule.web_transport_stream_data().stream_id);
374 break;
375 case CapsuleType::WT_RESET_STREAM:
376 ProcessStreamCapsule(capsule,
377 capsule.web_transport_reset_stream().stream_id);
378 break;
379 case CapsuleType::WT_STOP_SENDING:
380 ProcessStreamCapsule(capsule,
381 capsule.web_transport_stop_sending().stream_id);
382 break;
383 default:
384 break;
385 }
386 return state_ != kSessionClosed;
387 }
388
OnCapsuleParseFailure(absl::string_view error_message)389 void EncapsulatedSession::OnCapsuleParseFailure(
390 absl::string_view error_message) {
391 if (state_ == kSessionClosed) {
392 return;
393 }
394 OnFatalError(absl::StrCat("Stream parse error: ", error_message));
395 }
396
ProcessStreamCapsule(const quiche::Capsule & capsule,StreamId stream_id)397 void EncapsulatedSession::ProcessStreamCapsule(const quiche::Capsule& capsule,
398 StreamId stream_id) {
399 bool new_stream_created = false;
400 auto it = streams_.find(stream_id);
401 if (it == streams_.end()) {
402 if (IsOutgoing(stream_id)) {
403 // Ignore this frame, as it is possible that it refers to an outgoing
404 // stream that has been closed.
405 return;
406 }
407 // TODO: check flow control here.
408 it = streams_.emplace_hint(it, std::piecewise_construct,
409 std::forward_as_tuple(stream_id),
410 std::forward_as_tuple(this, stream_id));
411 new_stream_created = true;
412 }
413 InnerStream& stream = it->second;
414 stream.ProcessCapsule(capsule);
415 if (new_stream_created) {
416 if (IsBidirectionalId(stream_id)) {
417 incoming_bidirectional_streams_.push_back(stream_id);
418 visitor_->OnIncomingBidirectionalStreamAvailable();
419 } else {
420 incoming_unidirectional_streams_.push_back(stream_id);
421 visitor_->OnIncomingUnidirectionalStreamAvailable();
422 }
423 }
424 }
425
ProcessCapsule(const quiche::Capsule & capsule)426 void EncapsulatedSession::InnerStream::ProcessCapsule(
427 const quiche::Capsule& capsule) {
428 switch (capsule.capsule_type()) {
429 case CapsuleType::WT_STREAM:
430 case CapsuleType::WT_STREAM_WITH_FIN: {
431 if (fin_received_) {
432 session_->OnFatalError(
433 "Received stream data for a stream that has already received a "
434 "FIN");
435 return;
436 }
437 if (read_side_closed_) {
438 // It is possible that we sent STOP_SENDING but it has not been received
439 // yet. Ignore.
440 return;
441 }
442 fin_received_ = capsule.capsule_type() == CapsuleType::WT_STREAM_WITH_FIN;
443 const quiche::WebTransportStreamDataCapsule& data =
444 capsule.web_transport_stream_data();
445 if (!data.data.empty()) {
446 incoming_reads_.push_back(IncomingRead{data.data, std::string()});
447 }
448 // Fast path: if the visitor consumes all of the incoming reads, we don't
449 // need to copy data from the capsule parser.
450 if (visitor_ != nullptr) {
451 visitor_->OnCanRead();
452 }
453 // Slow path: copy all data that the visitor have not consumed.
454 for (IncomingRead& read : incoming_reads_) {
455 QUICHE_DCHECK(!read.data.empty());
456 if (read.storage.empty()) {
457 read.storage = std::string(read.data);
458 read.data = read.storage;
459 }
460 }
461 return;
462 }
463 case CapsuleType::WT_RESET_STREAM:
464 CloseReadSide(capsule.web_transport_reset_stream().error_code);
465 return;
466 case CapsuleType::WT_STOP_SENDING:
467 CloseWriteSide(capsule.web_transport_stop_sending().error_code);
468 return;
469 default:
470 QUICHE_BUG(WT_H2_ProcessStreamCapsule_Unknown)
471 << "Unexpected capsule dispatched to InnerStream: " << capsule;
472 session_->OnFatalError(
473 "Internal error: Unexpected capsule dispatched to InnerStream");
474 return;
475 }
476 }
477
OpenSession()478 void EncapsulatedSession::OpenSession() {
479 state_ = kSessionOpen;
480 visitor_->OnSessionReady();
481 OnCanWrite();
482 OnCanRead();
483 }
484
SendFin(absl::string_view data)485 absl::Status EncapsulatedSession::SendFin(absl::string_view data) {
486 QUICHE_DCHECK(!fin_sent_);
487 fin_sent_ = true;
488 quiche::StreamWriteOptions options;
489 options.set_send_fin(true);
490 return quiche::WriteIntoStream(*writer_, data, options);
491 }
492
OnSessionClosed(SessionErrorCode error_code,const std::string & error_message)493 void EncapsulatedSession::OnSessionClosed(SessionErrorCode error_code,
494 const std::string& error_message) {
495 if (!fin_sent_) {
496 absl::Status status = SendFin("");
497 if (!status.ok()) {
498 OnWriteError(status);
499 return;
500 }
501 }
502
503 if (session_close_notified_) {
504 QUICHE_DCHECK_EQ(state_, kSessionClosed);
505 return;
506 }
507 state_ = kSessionClosed;
508 session_close_notified_ = true;
509
510 if (visitor_ != nullptr) {
511 visitor_->OnSessionClosed(error_code, error_message);
512 }
513 }
514
OnFatalError(absl::string_view error_message)515 void EncapsulatedSession::OnFatalError(absl::string_view error_message) {
516 QUICHE_DLOG(ERROR) << "Fatal error in encapsulated WebTransport: "
517 << error_message;
518 state_ = kSessionClosed;
519 if (fatal_error_callback_) {
520 std::move(fatal_error_callback_)(error_message);
521 fatal_error_callback_ = nullptr;
522 }
523 }
524
OnWriteError(absl::Status error)525 void EncapsulatedSession::OnWriteError(absl::Status error) {
526 OnFatalError(absl::StrCat(
527 error, " while trying to write encapsulated WebTransport data"));
528 }
529
InnerStream(EncapsulatedSession * session,StreamId id)530 EncapsulatedSession::InnerStream::InnerStream(EncapsulatedSession* session,
531 StreamId id)
532 : session_(session),
533 id_(id),
534 read_side_closed_(IsUnidirectionalId(id) &&
535 IsIdOpenedBy(id, session->perspective_)),
536 write_side_closed_(IsUnidirectionalId(id) &&
537 !IsIdOpenedBy(id, session->perspective_)) {
538 if (!write_side_closed_) {
539 absl::Status status = session_->scheduler_.Register(id_, kDefaultPriority);
540 if (!status.ok()) {
541 QUICHE_BUG(WT_H2_FailedToRegisterNewStream) << status;
542 session_->OnFatalError(
543 "Failed to register new stream with the scheduler");
544 return;
545 }
546 }
547 }
548
Read(absl::Span<char> output)549 quiche::ReadStream::ReadResult EncapsulatedSession::InnerStream::Read(
550 absl::Span<char> output) {
551 const size_t total_size = output.size();
552 for (const IncomingRead& read : incoming_reads_) {
553 size_t size_to_read = std::min(read.size(), output.size());
554 if (size_to_read == 0) {
555 break;
556 }
557 memcpy(output.data(), read.data.data(), size_to_read);
558 output = output.subspan(size_to_read);
559 }
560 bool fin_consumed = SkipBytes(total_size);
561 return ReadResult{total_size, fin_consumed};
562 }
Read(std::string * output)563 quiche::ReadStream::ReadResult EncapsulatedSession::InnerStream::Read(
564 std::string* output) {
565 const size_t total_size = ReadableBytes();
566 const size_t initial_offset = output->size();
567 output->resize(initial_offset + total_size);
568 return Read(absl::Span<char>(&((*output)[initial_offset]), total_size));
569 }
ReadableBytes() const570 size_t EncapsulatedSession::InnerStream::ReadableBytes() const {
571 size_t total_size = 0;
572 for (const IncomingRead& read : incoming_reads_) {
573 total_size += read.size();
574 }
575 return total_size;
576 }
577 quiche::ReadStream::PeekResult
PeekNextReadableRegion() const578 EncapsulatedSession::InnerStream::PeekNextReadableRegion() const {
579 if (incoming_reads_.empty()) {
580 return PeekResult{absl::string_view(), fin_received_, fin_received_};
581 }
582 return PeekResult{incoming_reads_.front().data,
583 fin_received_ && incoming_reads_.size() == 1,
584 fin_received_};
585 }
586
SkipBytes(size_t bytes)587 bool EncapsulatedSession::InnerStream::SkipBytes(size_t bytes) {
588 size_t remaining = bytes;
589 while (remaining > 0) {
590 if (incoming_reads_.empty()) {
591 QUICHE_BUG(WT_H2_SkipBytes_toomuch)
592 << "Requested to skip " << remaining
593 << " bytes that are not present in the read buffer.";
594 return false;
595 }
596 IncomingRead& current = incoming_reads_.front();
597 if (remaining < current.size()) {
598 current.data = current.data.substr(remaining);
599 return false;
600 }
601 remaining -= current.size();
602 incoming_reads_.pop_front();
603 }
604 if (incoming_reads_.empty() && fin_received_) {
605 fin_consumed_ = true;
606 CloseReadSide(std::nullopt);
607 return true;
608 }
609 return false;
610 }
611
Writev(const absl::Span<const absl::string_view> data,const quiche::StreamWriteOptions & options)612 absl::Status EncapsulatedSession::InnerStream::Writev(
613 const absl::Span<const absl::string_view> data,
614 const quiche::StreamWriteOptions& options) {
615 if (write_side_closed_) {
616 return absl::FailedPreconditionError(
617 "Trying to write into an already-closed stream");
618 }
619 if (fin_buffered_) {
620 return absl::FailedPreconditionError("FIN already buffered");
621 }
622 if (!CanWrite()) {
623 return absl::FailedPreconditionError(
624 "Trying to write into a stream when CanWrite() = false");
625 }
626
627 const absl::StatusOr<bool> should_yield =
628 session_->scheduler_.ShouldYield(id_);
629 if (!should_yield.ok()) {
630 QUICHE_BUG(WT_H2_Writev_NotRegistered) << should_yield.status();
631 session_->OnFatalError("Stream not registered with the scheduler");
632 return absl::InternalError("Stream not registered with the scheduler");
633 }
634 const bool write_blocked = !session_->writer_->CanWrite() || *should_yield ||
635 !pending_write_.empty();
636 if (write_blocked) {
637 fin_buffered_ = options.send_fin();
638 for (absl::string_view chunk : data) {
639 absl::StrAppend(&pending_write_, chunk);
640 }
641 absl::Status status = session_->scheduler_.Schedule(id_);
642 if (!status.ok()) {
643 QUICHE_BUG(WT_H2_Writev_CantSchedule) << status;
644 session_->OnFatalError("Could not schedule a write-blocked stream");
645 return absl::InternalError("Could not schedule a write-blocked stream");
646 }
647 return absl::OkStatus();
648 }
649
650 size_t bytes_written = WriteInner(data, options.send_fin());
651 // TODO: handle partial writes when flow control requires those.
652 QUICHE_DCHECK(bytes_written == 0 ||
653 bytes_written == quiche::TotalStringViewSpanSize(data));
654 if (bytes_written == 0) {
655 for (absl::string_view chunk : data) {
656 absl::StrAppend(&pending_write_, chunk);
657 }
658 }
659
660 if (options.send_fin()) {
661 CloseWriteSide(std::nullopt);
662 }
663 return absl::OkStatus();
664 }
665
CanWrite() const666 bool EncapsulatedSession::InnerStream::CanWrite() const {
667 return session_->state_ != EncapsulatedSession::kSessionClosed &&
668 !write_side_closed_ &&
669 (pending_write_.size() <= session_->max_stream_data_buffered_);
670 }
671
FlushPendingWrite()672 void EncapsulatedSession::InnerStream::FlushPendingWrite() {
673 QUICHE_DCHECK(!write_side_closed_);
674 QUICHE_DCHECK(session_->writer_->CanWrite());
675 QUICHE_DCHECK(!pending_write_.empty());
676 absl::string_view to_write = pending_write_;
677 size_t bytes_written =
678 WriteInner(absl::MakeSpan(&to_write, 1), fin_buffered_);
679 if (bytes_written < to_write.size()) {
680 pending_write_ = pending_write_.substr(bytes_written);
681 return;
682 }
683 pending_write_.clear();
684 if (fin_buffered_) {
685 CloseWriteSide(std::nullopt);
686 }
687 if (!write_side_closed_ && visitor_ != nullptr) {
688 visitor_->OnCanWrite();
689 }
690 }
691
WriteInner(absl::Span<const absl::string_view> data,bool fin)692 size_t EncapsulatedSession::InnerStream::WriteInner(
693 absl::Span<const absl::string_view> data, bool fin) {
694 size_t total_size = quiche::TotalStringViewSpanSize(data);
695 if (total_size == 0 && !fin) {
696 session_->OnFatalError("Attempted to make an empty write with fin=false");
697 return 0;
698 }
699 quiche::QuicheBuffer header =
700 quiche::SerializeWebTransportStreamCapsuleHeader(id_, fin, total_size,
701 session_->allocator_);
702 std::vector<absl::string_view> views_to_write;
703 views_to_write.reserve(data.size() + 1);
704 views_to_write.push_back(header.AsStringView());
705 absl::c_copy(data, std::back_inserter(views_to_write));
706 absl::Status write_status = session_->writer_->Writev(
707 views_to_write, quiche::kDefaultStreamWriteOptions);
708 if (!write_status.ok()) {
709 session_->OnWriteError(write_status);
710 return 0;
711 }
712 return total_size;
713 }
714
AbruptlyTerminate(absl::Status error)715 void EncapsulatedSession::InnerStream::AbruptlyTerminate(absl::Status error) {
716 QUICHE_DLOG(INFO) << "Abruptly terminating the stream due to error: "
717 << error;
718 ResetDueToInternalError();
719 }
720
ResetWithUserCode(StreamErrorCode error)721 void EncapsulatedSession::InnerStream::ResetWithUserCode(
722 StreamErrorCode error) {
723 if (reset_frame_sent_) {
724 return;
725 }
726 reset_frame_sent_ = true;
727
728 session_->SendControlCapsule(
729 quiche::WebTransportResetStreamCapsule{id_, error});
730 CloseWriteSide(std::nullopt);
731 }
732
SendStopSending(StreamErrorCode error)733 void EncapsulatedSession::InnerStream::SendStopSending(StreamErrorCode error) {
734 if (stop_sending_sent_) {
735 return;
736 }
737 stop_sending_sent_ = true;
738
739 session_->SendControlCapsule(
740 quiche::WebTransportStopSendingCapsule{id_, error});
741 CloseReadSide(std::nullopt);
742 }
743
CloseReadSide(std::optional<StreamErrorCode> error)744 void EncapsulatedSession::InnerStream::CloseReadSide(
745 std::optional<StreamErrorCode> error) {
746 if (read_side_closed_) {
747 return;
748 }
749 read_side_closed_ = true;
750 incoming_reads_.clear();
751 if (error.has_value() && visitor_ != nullptr) {
752 visitor_->OnResetStreamReceived(*error);
753 }
754 if (CanBeGarbageCollected()) {
755 session_->streams_to_garbage_collect_.push_back(id_);
756 }
757 }
758
CloseWriteSide(std::optional<StreamErrorCode> error)759 void EncapsulatedSession::InnerStream::CloseWriteSide(
760 std::optional<StreamErrorCode> error) {
761 if (write_side_closed_) {
762 return;
763 }
764 write_side_closed_ = true;
765 pending_write_.clear();
766 absl::Status status = session_->scheduler_.Unregister(id_);
767 if (!status.ok()) {
768 session_->OnFatalError("Failed to unregister closed stream");
769 return;
770 }
771 if (error.has_value() && visitor_ != nullptr) {
772 visitor_->OnStopSendingReceived(*error);
773 }
774 if (CanBeGarbageCollected()) {
775 session_->streams_to_garbage_collect_.push_back(id_);
776 }
777 }
778
GarbageCollectStreams()779 void EncapsulatedSession::GarbageCollectStreams() {
780 for (StreamId id : streams_to_garbage_collect_) {
781 streams_.erase(id);
782 }
783 streams_to_garbage_collect_.clear();
784 }
785
786 } // namespace webtransport
787