1 // Copyright (c) 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
6 #define QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
7
8 #include <cstddef>
9 #include <cstdint>
10 #include <memory>
11 #include <optional>
12 #include <string>
13 #include <utility>
14 #include <vector>
15
16 #include "absl/base/attributes.h"
17 #include "absl/container/node_hash_map.h"
18 #include "absl/status/status.h"
19 #include "absl/strings/string_view.h"
20 #include "absl/time/time.h"
21 #include "absl/types/span.h"
22 #include "quiche/common/capsule.h"
23 #include "quiche/common/http/http_header_block.h"
24 #include "quiche/common/platform/api/quiche_export.h"
25 #include "quiche/common/quiche_buffer_allocator.h"
26 #include "quiche/common/quiche_callbacks.h"
27 #include "quiche/common/quiche_circular_deque.h"
28 #include "quiche/common/quiche_stream.h"
29 #include "quiche/common/simple_buffer_allocator.h"
30 #include "quiche/web_transport/web_transport.h"
31 #include "quiche/web_transport/web_transport_priority_scheduler.h"
32
33 namespace webtransport {
34
IsUnidirectionalId(StreamId id)35 constexpr bool IsUnidirectionalId(StreamId id) { return id & 0b10; }
IsBidirectionalId(StreamId id)36 constexpr bool IsBidirectionalId(StreamId id) {
37 return !IsUnidirectionalId(id);
38 }
IsIdOpenedBy(StreamId id,Perspective perspective)39 constexpr bool IsIdOpenedBy(StreamId id, Perspective perspective) {
40 return (id & 0b01) ^ (perspective == Perspective::kClient);
41 }
42
43 using FatalErrorCallback = quiche::SingleUseCallback<void(absl::string_view)>;
44
45 // Implementation of the WebTransport over HTTP/2 protocol; works over any
46 // arbitrary bidirectional bytestream that can be prefixed with HTTP headers.
47 // Specification: https://datatracker.ietf.org/doc/draft-ietf-webtrans-http2/
48 class QUICHE_EXPORT EncapsulatedSession
49 : public webtransport::Session,
50 public quiche::WriteStreamVisitor,
51 public quiche::ReadStreamVisitor,
52 public quiche::CapsuleParser::Visitor {
53 public:
54 // The state machine of the transport.
55 enum State {
56 // The transport object has been created, but
57 // InitializeClient/InitializeServer has not been called yet.
58 kUninitialized,
59 // The client has sent its own headers, but haven't received a response yet.
60 kWaitingForHeaders,
61 // Both the client and the server headers have been processed.
62 kSessionOpen,
63 // The session close has been requested, but the CLOSE capsule hasn't been
64 // sent yet.
65 kSessionClosing,
66 // The session has been closed; no further data will be exchanged.
67 kSessionClosed,
68 };
69
70 // The `fatal_error_callback` implies that any state related to the session
71 // should be torn down after it's been called.
72 EncapsulatedSession(Perspective perspective,
73 FatalErrorCallback fatal_error_callback);
74
75 // WebTransport uses HTTP headers in a similar way to how QUIC uses SETTINGS;
76 // thus, the headers are necessary to initialize the session.
77 void InitializeClient(std::unique_ptr<SessionVisitor> visitor,
78 quiche::HttpHeaderBlock& outgoing_headers,
79 quiche::WriteStream* writer,
80 quiche::ReadStream* reader);
81 void InitializeServer(std::unique_ptr<SessionVisitor> visitor,
82 const quiche::HttpHeaderBlock& incoming_headers,
83 quiche::HttpHeaderBlock& outgoing_headers,
84 quiche::WriteStream* writer,
85 quiche::ReadStream* reader);
86 void ProcessIncomingServerHeaders(const quiche::HttpHeaderBlock& headers);
87
88 // webtransport::Session implementation.
89 void CloseSession(SessionErrorCode error_code,
90 absl::string_view error_message) override;
91 Stream* AcceptIncomingBidirectionalStream() override;
92 Stream* AcceptIncomingUnidirectionalStream() override;
93 bool CanOpenNextOutgoingBidirectionalStream() override;
94 bool CanOpenNextOutgoingUnidirectionalStream() override;
95 Stream* OpenOutgoingBidirectionalStream() override;
96 Stream* OpenOutgoingUnidirectionalStream() override;
97 DatagramStatus SendOrQueueDatagram(absl::string_view datagram) override;
98 uint64_t GetMaxDatagramSize() const override;
99 void SetDatagramMaxTimeInQueue(absl::Duration max_time_in_queue) override;
100 Stream* GetStreamById(StreamId id) override;
101 DatagramStats GetDatagramStats() override;
102 SessionStats GetSessionStats() override;
103 void NotifySessionDraining() override;
104 void SetOnDraining(quiche::SingleUseCallback<void()> callback) override;
105
106 // quiche::WriteStreamVisitor implementation.
107 void OnCanWrite() override;
108 // quiche::ReadStreamVisitor implementation.
109 void OnCanRead() override;
110 // quiche::CapsuleParser::Visitor implementation.
111 bool OnCapsule(const quiche::Capsule& capsule) override;
112 void OnCapsuleParseFailure(absl::string_view error_message) override;
113
state()114 State state() const { return state_; }
115
116 // Cleans up the state for all of the streams that have been closed. QUIC
117 // uses timers to safely delete closed streams while minimizing the risk that
118 // something on stack holds an active pointer to them; WebTransport over
119 // HTTP/2 does not have any timers in it, making that approach inapplicable
120 // here. This class does automatically run garbage collection at the end of
121 // every OnCanRead() call (since it's a top-level entrypoint that is likely to
122 // come directly from I/O handler), but if the application does not happen to
123 // read data frequently, manual calls to this function may be requried.
124 void GarbageCollectStreams();
125
126 private:
127 // If the amount of data buffered in the socket exceeds the amount specified
128 // here, CanWrite() will start returning false.
129 static constexpr size_t kDefaultMaxBufferedStreamData = 16 * 1024;
130
131 class InnerStream : public Stream {
132 public:
133 InnerStream(EncapsulatedSession* session, StreamId id);
134 InnerStream(const InnerStream&) = delete;
135 InnerStream(InnerStream&&) = delete;
136 InnerStream& operator=(const InnerStream&) = delete;
137 InnerStream& operator=(InnerStream&&) = delete;
138
139 // ReadStream implementation.
140 ABSL_MUST_USE_RESULT ReadResult Read(absl::Span<char> output) override;
141 ABSL_MUST_USE_RESULT ReadResult Read(std::string* output) override;
142 size_t ReadableBytes() const override;
143 PeekResult PeekNextReadableRegion() const override;
144 bool SkipBytes(size_t bytes) override;
145
146 // WriteStream implementation.
147 absl::Status Writev(absl::Span<const absl::string_view> data,
148 const quiche::StreamWriteOptions& options) override;
149 bool CanWrite() const override;
150
151 // TerminableStream implementation.
152 void AbruptlyTerminate(absl::Status error) override;
153
154 // Stream implementation.
GetStreamId()155 StreamId GetStreamId() const override { return id_; }
visitor()156 StreamVisitor* visitor() override { return visitor_.get(); }
SetVisitor(std::unique_ptr<StreamVisitor> visitor)157 void SetVisitor(std::unique_ptr<StreamVisitor> visitor) override {
158 visitor_ = std::move(visitor);
159 }
160
161 void ResetWithUserCode(StreamErrorCode error) override;
162 void SendStopSending(StreamErrorCode error) override;
163
ResetDueToInternalError()164 void ResetDueToInternalError() override { ResetWithUserCode(0); }
MaybeResetDueToStreamObjectGone()165 void MaybeResetDueToStreamObjectGone() override { ResetWithUserCode(0); }
166
167 void CloseReadSide(std::optional<StreamErrorCode> error);
168 void CloseWriteSide(std::optional<StreamErrorCode> error);
CanBeGarbageCollected()169 bool CanBeGarbageCollected() const {
170 return read_side_closed_ && write_side_closed_;
171 }
172
HasPendingWrite()173 bool HasPendingWrite() const { return !pending_write_.empty(); }
174 void FlushPendingWrite();
175
176 void ProcessCapsule(const quiche::Capsule& capsule);
177
178 private:
179 // Struct for storing data that can potentially either stored inside the
180 // object or inside some other object on the stack. Here is roughly how this
181 // works:
182 // 1. A read is enqueued with `data` pointing to a temporary buffer, and
183 // `storage` being empty.
184 // 2. Visitor::OnCanRead() is called, potentially causing the user to
185 // consume the data from the temporary buffer directly.
186 // 3. If user does not consume data immediately, it's copied to `storage`
187 // (and the pointer to `data` is updated) so that it can be read later.
188 struct IncomingRead {
189 absl::string_view data;
190 std::string storage;
191
sizeIncomingRead192 size_t size() const { return data.size(); }
193 };
194
195 // Tries to send `data`; may send less if limited by flow control.
196 [[nodiscard]] size_t WriteInner(absl::Span<const absl::string_view> data,
197 bool fin);
198
199 EncapsulatedSession* session_;
200 StreamId id_;
201 std::unique_ptr<StreamVisitor> visitor_;
202 quiche::QuicheCircularDeque<IncomingRead> incoming_reads_;
203 std::string pending_write_;
204 bool read_side_closed_;
205 bool write_side_closed_;
206 bool reset_frame_sent_ = false;
207 bool stop_sending_sent_ = false;
208 bool fin_received_ = false;
209 bool fin_consumed_ = false;
210 bool fin_buffered_ = false;
211 };
212
213 struct BufferedClose {
214 SessionErrorCode error_code = 0;
215 std::string error_message;
216 };
217
218 Perspective perspective_;
219 State state_ = kUninitialized;
220 std::unique_ptr<SessionVisitor> visitor_ = nullptr;
221 FatalErrorCallback fatal_error_callback_;
222 quiche::SingleUseCallback<void()> draining_callback_;
223 quiche::WriteStream* writer_ = nullptr; // Not owned.
224 quiche::ReadStream* reader_ = nullptr; // Not owned.
225 quiche::QuicheBufferAllocator* allocator_ =
226 quiche::SimpleBufferAllocator::Get();
227 quiche::CapsuleParser capsule_parser_;
228
229 size_t max_stream_data_buffered_ = kDefaultMaxBufferedStreamData;
230
231 PriorityScheduler scheduler_;
232 absl::node_hash_map<StreamId, InnerStream>
233 streams_; // Streams unregister themselves with scheduler on deletion,
234 // and thus have to be above it.
235 quiche::QuicheCircularDeque<StreamId> incoming_bidirectional_streams_;
236 quiche::QuicheCircularDeque<StreamId> incoming_unidirectional_streams_;
237 std::vector<StreamId> streams_to_garbage_collect_;
238 StreamId next_outgoing_bidi_stream_;
239 StreamId next_outgoing_unidi_stream_;
240
241 bool session_close_notified_ = false;
242 bool fin_sent_ = false;
243
244 BufferedClose buffered_session_close_;
245 quiche::QuicheCircularDeque<quiche::QuicheBuffer> control_capsule_queue_;
246
247 void OpenSession();
248 absl::Status SendFin(absl::string_view data);
249 void OnSessionClosed(SessionErrorCode error_code,
250 const std::string& error_message);
251 void OnFatalError(absl::string_view error_message);
252 void OnWriteError(absl::Status error);
253
IsOutgoing(StreamId id)254 bool IsOutgoing(StreamId id) { return IsIdOpenedBy(id, perspective_); }
IsIncoming(StreamId id)255 bool IsIncoming(StreamId id) { return !IsOutgoing(id); }
256
257 template <typename CapsuleType>
SendControlCapsule(CapsuleType capsule)258 void SendControlCapsule(CapsuleType capsule) {
259 control_capsule_queue_.push_back(quiche::SerializeCapsule(
260 quiche::Capsule(std::move(capsule)), allocator_));
261 OnCanWrite();
262 }
263
264 Stream* AcceptIncomingStream(quiche::QuicheCircularDeque<StreamId>& queue);
265 Stream* OpenOutgoingStream(StreamId& counter);
266 void ProcessStreamCapsule(const quiche::Capsule& capsule, StreamId stream_id);
267 };
268
269 } // namespace webtransport
270
271 #endif // QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
272