1 // Copyright (c) 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
6 #define QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
7 
8 #include <cstddef>
9 #include <cstdint>
10 #include <memory>
11 #include <optional>
12 #include <string>
13 #include <utility>
14 #include <vector>
15 
16 #include "absl/base/attributes.h"
17 #include "absl/container/node_hash_map.h"
18 #include "absl/status/status.h"
19 #include "absl/strings/string_view.h"
20 #include "absl/time/time.h"
21 #include "absl/types/span.h"
22 #include "quiche/common/capsule.h"
23 #include "quiche/common/http/http_header_block.h"
24 #include "quiche/common/platform/api/quiche_export.h"
25 #include "quiche/common/quiche_buffer_allocator.h"
26 #include "quiche/common/quiche_callbacks.h"
27 #include "quiche/common/quiche_circular_deque.h"
28 #include "quiche/common/quiche_stream.h"
29 #include "quiche/common/simple_buffer_allocator.h"
30 #include "quiche/web_transport/web_transport.h"
31 #include "quiche/web_transport/web_transport_priority_scheduler.h"
32 
33 namespace webtransport {
34 
IsUnidirectionalId(StreamId id)35 constexpr bool IsUnidirectionalId(StreamId id) { return id & 0b10; }
IsBidirectionalId(StreamId id)36 constexpr bool IsBidirectionalId(StreamId id) {
37   return !IsUnidirectionalId(id);
38 }
IsIdOpenedBy(StreamId id,Perspective perspective)39 constexpr bool IsIdOpenedBy(StreamId id, Perspective perspective) {
40   return (id & 0b01) ^ (perspective == Perspective::kClient);
41 }
42 
43 using FatalErrorCallback = quiche::SingleUseCallback<void(absl::string_view)>;
44 
45 // Implementation of the WebTransport over HTTP/2 protocol; works over any
46 // arbitrary bidirectional bytestream that can be prefixed with HTTP headers.
47 // Specification: https://datatracker.ietf.org/doc/draft-ietf-webtrans-http2/
48 class QUICHE_EXPORT EncapsulatedSession
49     : public webtransport::Session,
50       public quiche::WriteStreamVisitor,
51       public quiche::ReadStreamVisitor,
52       public quiche::CapsuleParser::Visitor {
53  public:
54   // The state machine of the transport.
55   enum State {
56     // The transport object has been created, but
57     // InitializeClient/InitializeServer has not been called yet.
58     kUninitialized,
59     // The client has sent its own headers, but haven't received a response yet.
60     kWaitingForHeaders,
61     // Both the client and the server headers have been processed.
62     kSessionOpen,
63     // The session close has been requested, but the CLOSE capsule hasn't been
64     // sent yet.
65     kSessionClosing,
66     // The session has been closed; no further data will be exchanged.
67     kSessionClosed,
68   };
69 
70   // The `fatal_error_callback` implies that any state related to the session
71   // should be torn down after it's been called.
72   EncapsulatedSession(Perspective perspective,
73                       FatalErrorCallback fatal_error_callback);
74 
75   // WebTransport uses HTTP headers in a similar way to how QUIC uses SETTINGS;
76   // thus, the headers are necessary to initialize the session.
77   void InitializeClient(std::unique_ptr<SessionVisitor> visitor,
78                         quiche::HttpHeaderBlock& outgoing_headers,
79                         quiche::WriteStream* writer,
80                         quiche::ReadStream* reader);
81   void InitializeServer(std::unique_ptr<SessionVisitor> visitor,
82                         const quiche::HttpHeaderBlock& incoming_headers,
83                         quiche::HttpHeaderBlock& outgoing_headers,
84                         quiche::WriteStream* writer,
85                         quiche::ReadStream* reader);
86   void ProcessIncomingServerHeaders(const quiche::HttpHeaderBlock& headers);
87 
88   // webtransport::Session implementation.
89   void CloseSession(SessionErrorCode error_code,
90                     absl::string_view error_message) override;
91   Stream* AcceptIncomingBidirectionalStream() override;
92   Stream* AcceptIncomingUnidirectionalStream() override;
93   bool CanOpenNextOutgoingBidirectionalStream() override;
94   bool CanOpenNextOutgoingUnidirectionalStream() override;
95   Stream* OpenOutgoingBidirectionalStream() override;
96   Stream* OpenOutgoingUnidirectionalStream() override;
97   DatagramStatus SendOrQueueDatagram(absl::string_view datagram) override;
98   uint64_t GetMaxDatagramSize() const override;
99   void SetDatagramMaxTimeInQueue(absl::Duration max_time_in_queue) override;
100   Stream* GetStreamById(StreamId id) override;
101   DatagramStats GetDatagramStats() override;
102   SessionStats GetSessionStats() override;
103   void NotifySessionDraining() override;
104   void SetOnDraining(quiche::SingleUseCallback<void()> callback) override;
105 
106   // quiche::WriteStreamVisitor implementation.
107   void OnCanWrite() override;
108   // quiche::ReadStreamVisitor implementation.
109   void OnCanRead() override;
110   // quiche::CapsuleParser::Visitor implementation.
111   bool OnCapsule(const quiche::Capsule& capsule) override;
112   void OnCapsuleParseFailure(absl::string_view error_message) override;
113 
state()114   State state() const { return state_; }
115 
116   // Cleans up the state for all of the streams that have been closed.  QUIC
117   // uses timers to safely delete closed streams while minimizing the risk that
118   // something on stack holds an active pointer to them; WebTransport over
119   // HTTP/2 does not have any timers in it, making that approach inapplicable
120   // here. This class does automatically run garbage collection at the end of
121   // every OnCanRead() call (since it's a top-level entrypoint that is likely to
122   // come directly from I/O handler), but if the application does not happen to
123   // read data frequently, manual calls to this function may be requried.
124   void GarbageCollectStreams();
125 
126  private:
127   // If the amount of data buffered in the socket exceeds the amount specified
128   // here, CanWrite() will start returning false.
129   static constexpr size_t kDefaultMaxBufferedStreamData = 16 * 1024;
130 
131   class InnerStream : public Stream {
132    public:
133     InnerStream(EncapsulatedSession* session, StreamId id);
134     InnerStream(const InnerStream&) = delete;
135     InnerStream(InnerStream&&) = delete;
136     InnerStream& operator=(const InnerStream&) = delete;
137     InnerStream& operator=(InnerStream&&) = delete;
138 
139     // ReadStream implementation.
140     ABSL_MUST_USE_RESULT ReadResult Read(absl::Span<char> output) override;
141     ABSL_MUST_USE_RESULT ReadResult Read(std::string* output) override;
142     size_t ReadableBytes() const override;
143     PeekResult PeekNextReadableRegion() const override;
144     bool SkipBytes(size_t bytes) override;
145 
146     // WriteStream implementation.
147     absl::Status Writev(absl::Span<const absl::string_view> data,
148                         const quiche::StreamWriteOptions& options) override;
149     bool CanWrite() const override;
150 
151     // TerminableStream implementation.
152     void AbruptlyTerminate(absl::Status error) override;
153 
154     // Stream implementation.
GetStreamId()155     StreamId GetStreamId() const override { return id_; }
visitor()156     StreamVisitor* visitor() override { return visitor_.get(); }
SetVisitor(std::unique_ptr<StreamVisitor> visitor)157     void SetVisitor(std::unique_ptr<StreamVisitor> visitor) override {
158       visitor_ = std::move(visitor);
159     }
160 
161     void ResetWithUserCode(StreamErrorCode error) override;
162     void SendStopSending(StreamErrorCode error) override;
163 
ResetDueToInternalError()164     void ResetDueToInternalError() override { ResetWithUserCode(0); }
MaybeResetDueToStreamObjectGone()165     void MaybeResetDueToStreamObjectGone() override { ResetWithUserCode(0); }
166 
167     void CloseReadSide(std::optional<StreamErrorCode> error);
168     void CloseWriteSide(std::optional<StreamErrorCode> error);
CanBeGarbageCollected()169     bool CanBeGarbageCollected() const {
170       return read_side_closed_ && write_side_closed_;
171     }
172 
HasPendingWrite()173     bool HasPendingWrite() const { return !pending_write_.empty(); }
174     void FlushPendingWrite();
175 
176     void ProcessCapsule(const quiche::Capsule& capsule);
177 
178    private:
179     // Struct for storing data that can potentially either stored inside the
180     // object or inside some other object on the stack. Here is roughly how this
181     // works:
182     //   1. A read is enqueued with `data` pointing to a temporary buffer, and
183     //      `storage` being empty.
184     //   2. Visitor::OnCanRead() is called, potentially causing the user to
185     //      consume the data from the temporary buffer directly.
186     //   3. If user does not consume data immediately, it's copied to `storage`
187     //      (and the pointer to `data` is updated) so that it can be read later.
188     struct IncomingRead {
189       absl::string_view data;
190       std::string storage;
191 
sizeIncomingRead192       size_t size() const { return data.size(); }
193     };
194 
195     // Tries to send `data`; may send less if limited by flow control.
196     [[nodiscard]] size_t WriteInner(absl::Span<const absl::string_view> data,
197                                     bool fin);
198 
199     EncapsulatedSession* session_;
200     StreamId id_;
201     std::unique_ptr<StreamVisitor> visitor_;
202     quiche::QuicheCircularDeque<IncomingRead> incoming_reads_;
203     std::string pending_write_;
204     bool read_side_closed_;
205     bool write_side_closed_;
206     bool reset_frame_sent_ = false;
207     bool stop_sending_sent_ = false;
208     bool fin_received_ = false;
209     bool fin_consumed_ = false;
210     bool fin_buffered_ = false;
211   };
212 
213   struct BufferedClose {
214     SessionErrorCode error_code = 0;
215     std::string error_message;
216   };
217 
218   Perspective perspective_;
219   State state_ = kUninitialized;
220   std::unique_ptr<SessionVisitor> visitor_ = nullptr;
221   FatalErrorCallback fatal_error_callback_;
222   quiche::SingleUseCallback<void()> draining_callback_;
223   quiche::WriteStream* writer_ = nullptr;  // Not owned.
224   quiche::ReadStream* reader_ = nullptr;   // Not owned.
225   quiche::QuicheBufferAllocator* allocator_ =
226       quiche::SimpleBufferAllocator::Get();
227   quiche::CapsuleParser capsule_parser_;
228 
229   size_t max_stream_data_buffered_ = kDefaultMaxBufferedStreamData;
230 
231   PriorityScheduler scheduler_;
232   absl::node_hash_map<StreamId, InnerStream>
233       streams_;  // Streams unregister themselves with scheduler on deletion,
234                  // and thus have to be above it.
235   quiche::QuicheCircularDeque<StreamId> incoming_bidirectional_streams_;
236   quiche::QuicheCircularDeque<StreamId> incoming_unidirectional_streams_;
237   std::vector<StreamId> streams_to_garbage_collect_;
238   StreamId next_outgoing_bidi_stream_;
239   StreamId next_outgoing_unidi_stream_;
240 
241   bool session_close_notified_ = false;
242   bool fin_sent_ = false;
243 
244   BufferedClose buffered_session_close_;
245   quiche::QuicheCircularDeque<quiche::QuicheBuffer> control_capsule_queue_;
246 
247   void OpenSession();
248   absl::Status SendFin(absl::string_view data);
249   void OnSessionClosed(SessionErrorCode error_code,
250                        const std::string& error_message);
251   void OnFatalError(absl::string_view error_message);
252   void OnWriteError(absl::Status error);
253 
IsOutgoing(StreamId id)254   bool IsOutgoing(StreamId id) { return IsIdOpenedBy(id, perspective_); }
IsIncoming(StreamId id)255   bool IsIncoming(StreamId id) { return !IsOutgoing(id); }
256 
257   template <typename CapsuleType>
SendControlCapsule(CapsuleType capsule)258   void SendControlCapsule(CapsuleType capsule) {
259     control_capsule_queue_.push_back(quiche::SerializeCapsule(
260         quiche::Capsule(std::move(capsule)), allocator_));
261     OnCanWrite();
262   }
263 
264   Stream* AcceptIncomingStream(quiche::QuicheCircularDeque<StreamId>& queue);
265   Stream* OpenOutgoingStream(StreamId& counter);
266   void ProcessStreamCapsule(const quiche::Capsule& capsule, StreamId stream_id);
267 };
268 
269 }  // namespace webtransport
270 
271 #endif  // QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
272