xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/core/http/web_transport_stream_adapter.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2021 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/http/web_transport_stream_adapter.h"
6 
7 #include <cstddef>
8 #include <string>
9 #include <vector>
10 
11 #include "absl/status/status.h"
12 #include "absl/status/statusor.h"
13 #include "absl/strings/string_view.h"
14 #include "absl/types/span.h"
15 #include "quiche/quic/core/http/web_transport_http3.h"
16 #include "quiche/quic/core/quic_error_codes.h"
17 #include "quiche/quic/core/quic_session.h"
18 #include "quiche/quic/core/quic_stream.h"
19 #include "quiche/quic/core/quic_stream_sequencer.h"
20 #include "quiche/quic/core/quic_types.h"
21 #include "quiche/quic/core/web_transport_interface.h"
22 #include "quiche/quic/platform/api/quic_bug_tracker.h"
23 #include "quiche/quic/platform/api/quic_flags.h"
24 #include "quiche/quic/platform/api/quic_logging.h"
25 #include "quiche/common/quiche_mem_slice_storage.h"
26 #include "quiche/common/quiche_stream.h"
27 #include "quiche/web_transport/web_transport.h"
28 
29 namespace quic {
30 
WebTransportStreamAdapter(QuicSession * session,QuicStream * stream,QuicStreamSequencer * sequencer)31 WebTransportStreamAdapter::WebTransportStreamAdapter(
32     QuicSession* session, QuicStream* stream, QuicStreamSequencer* sequencer)
33     : session_(session), stream_(stream), sequencer_(sequencer) {}
34 
Read(absl::Span<char> buffer)35 WebTransportStream::ReadResult WebTransportStreamAdapter::Read(
36     absl::Span<char> buffer) {
37   iovec iov;
38   iov.iov_base = buffer.data();
39   iov.iov_len = buffer.size();
40   const size_t result = sequencer_->Readv(&iov, 1);
41   if (!fin_read_ && sequencer_->IsClosed()) {
42     fin_read_ = true;
43     stream_->OnFinRead();
44   }
45   return ReadResult{result, sequencer_->IsClosed()};
46 }
47 
Read(std::string * output)48 WebTransportStream::ReadResult WebTransportStreamAdapter::Read(
49     std::string* output) {
50   const size_t old_size = output->size();
51   const size_t bytes_to_read = ReadableBytes();
52   output->resize(old_size + bytes_to_read);
53   ReadResult result =
54       Read(absl::Span<char>(&(*output)[old_size], bytes_to_read));
55   QUICHE_DCHECK_EQ(bytes_to_read, result.bytes_read);
56   output->resize(old_size + result.bytes_read);
57   return result;
58 }
59 
Writev(absl::Span<const absl::string_view> data,const quiche::StreamWriteOptions & options)60 absl::Status WebTransportStreamAdapter::Writev(
61     absl::Span<const absl::string_view> data,
62     const quiche::StreamWriteOptions& options) {
63   if (data.empty() && !options.send_fin()) {
64     return absl::InvalidArgumentError(
65         "Writev() called without any data or a FIN");
66   }
67   const absl::Status initial_check_status = CheckBeforeStreamWrite();
68   if (!initial_check_status.ok() &&
69       !(initial_check_status.code() == absl::StatusCode::kUnavailable &&
70         options.buffer_unconditionally())) {
71     return initial_check_status;
72   }
73 
74   std::vector<iovec> iovecs;
75   size_t total_size = 0;
76   iovecs.resize(data.size());
77   for (size_t i = 0; i < data.size(); i++) {
78     // QuicheMemSliceStorage only reads iovec, thus this is safe.
79     iovecs[i].iov_base = const_cast<char*>(data[i].data());
80     iovecs[i].iov_len = data[i].size();
81     total_size += data[i].size();
82   }
83   quiche::QuicheMemSliceStorage storage(
84       iovecs.data(), iovecs.size(),
85       session_->connection()->helper()->GetStreamSendBufferAllocator(),
86       GetQuicFlag(quic_send_buffer_max_data_slice_size));
87   QuicConsumedData consumed = stream_->WriteMemSlices(
88       storage.ToSpan(), /*fin=*/options.send_fin(),
89       /*buffer_uncondtionally=*/options.buffer_unconditionally());
90 
91   if (consumed.bytes_consumed == total_size) {
92     return absl::OkStatus();
93   }
94   if (consumed.bytes_consumed == 0) {
95     return absl::UnavailableError("Stream write-blocked");
96   }
97   // WebTransportStream::Write() is an all-or-nothing write API.  To achieve
98   // that property, it relies on WriteMemSlices() being an all-or-nothing API.
99   // If WriteMemSlices() fails to provide that guarantee, we have no way to
100   // communicate a partial write to the caller, and thus it's safer to just
101   // close the connection.
102   constexpr absl::string_view kErrorMessage =
103       "WriteMemSlices() unexpectedly partially consumed the input data";
104   QUIC_BUG(WebTransportStreamAdapter partial write)
105       << kErrorMessage << ", provided: " << total_size
106       << ", written: " << consumed.bytes_consumed;
107   stream_->OnUnrecoverableError(QUIC_INTERNAL_ERROR,
108                                 std::string(kErrorMessage));
109   return absl::InternalError(kErrorMessage);
110 }
111 
CheckBeforeStreamWrite() const112 absl::Status WebTransportStreamAdapter::CheckBeforeStreamWrite() const {
113   if (stream_->write_side_closed() || stream_->fin_buffered()) {
114     return absl::FailedPreconditionError("Stream write side is closed");
115   }
116   if (!stream_->CanWriteNewData()) {
117     return absl::UnavailableError("Stream write-blocked");
118   }
119   return absl::OkStatus();
120 }
121 
CanWrite() const122 bool WebTransportStreamAdapter::CanWrite() const {
123   return CheckBeforeStreamWrite().ok();
124 }
125 
AbruptlyTerminate(absl::Status error)126 void WebTransportStreamAdapter::AbruptlyTerminate(absl::Status error) {
127   QUIC_DLOG(WARNING) << (session_->perspective() == Perspective::IS_CLIENT
128                              ? "Client: "
129                              : "Server: ")
130                      << "Abruptly terminating stream " << stream_->id()
131                      << " due to the following error: " << error;
132   ResetDueToInternalError();
133 }
134 
ReadableBytes() const135 size_t WebTransportStreamAdapter::ReadableBytes() const {
136   return sequencer_->ReadableBytes();
137 }
138 
139 quiche::ReadStream::PeekResult
PeekNextReadableRegion() const140 WebTransportStreamAdapter::PeekNextReadableRegion() const {
141   iovec iov;
142   PeekResult result;
143   if (sequencer_->GetReadableRegion(&iov)) {
144     result.peeked_data =
145         absl::string_view(static_cast<const char*>(iov.iov_base), iov.iov_len);
146   }
147   result.fin_next = sequencer_->IsClosed();
148   result.all_data_received = sequencer_->IsAllDataAvailable();
149   return result;
150 }
151 
SkipBytes(size_t bytes)152 bool WebTransportStreamAdapter::SkipBytes(size_t bytes) {
153   if (stream_->read_side_closed()) {
154     // Useful when the stream has been reset in between Peek() and Skip().
155     return true;
156   }
157   sequencer_->MarkConsumed(bytes);
158   if (!fin_read_ && sequencer_->IsClosed()) {
159     fin_read_ = true;
160     stream_->OnFinRead();
161   }
162   return sequencer_->IsClosed();
163 }
164 
OnDataAvailable()165 void WebTransportStreamAdapter::OnDataAvailable() {
166   if (visitor_ == nullptr) {
167     return;
168   }
169   const bool fin_readable = sequencer_->IsClosed() && !fin_read_;
170   if (ReadableBytes() == 0 && !fin_readable) {
171     return;
172   }
173   visitor_->OnCanRead();
174 }
175 
OnCanWriteNewData()176 void WebTransportStreamAdapter::OnCanWriteNewData() {
177   // Ensure the origin check has been completed, as the stream can be notified
178   // about being writable before that.
179   if (!CanWrite()) {
180     return;
181   }
182   if (visitor_ != nullptr) {
183     visitor_->OnCanWrite();
184   }
185 }
186 
ResetWithUserCode(WebTransportStreamError error)187 void WebTransportStreamAdapter::ResetWithUserCode(
188     WebTransportStreamError error) {
189   stream_->ResetWriteSide(QuicResetStreamError(
190       QUIC_STREAM_CANCELLED, WebTransportErrorToHttp3(error)));
191 }
192 
SendStopSending(WebTransportStreamError error)193 void WebTransportStreamAdapter::SendStopSending(WebTransportStreamError error) {
194   stream_->SendStopSending(QuicResetStreamError(
195       QUIC_STREAM_CANCELLED, WebTransportErrorToHttp3(error)));
196 }
197 
198 }  // namespace quic
199