1 // Copyright 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef QUICHE_QUIC_MOQT_MOQT_SESSION_H_
6 #define QUICHE_QUIC_MOQT_MOQT_SESSION_H_
7
8 #include <cstdint>
9 #include <optional>
10 #include <string>
11 #include <utility>
12
13 #include "absl/container/flat_hash_map.h"
14 #include "absl/container/flat_hash_set.h"
15 #include "absl/strings/string_view.h"
16 #include "quiche/quic/core/quic_types.h"
17 #include "quiche/quic/moqt/moqt_framer.h"
18 #include "quiche/quic/moqt/moqt_messages.h"
19 #include "quiche/quic/moqt/moqt_parser.h"
20 #include "quiche/quic/moqt/moqt_track.h"
21 #include "quiche/common/platform/api/quiche_export.h"
22 #include "quiche/common/quiche_buffer_allocator.h"
23 #include "quiche/common/quiche_callbacks.h"
24 #include "quiche/common/simple_buffer_allocator.h"
25 #include "quiche/web_transport/web_transport.h"
26
27 namespace moqt {
28
29 namespace test {
30 class MoqtSessionPeer;
31 }
32
33 using MoqtSessionEstablishedCallback = quiche::SingleUseCallback<void()>;
34 using MoqtSessionTerminatedCallback =
35 quiche::SingleUseCallback<void(absl::string_view error_message)>;
36 using MoqtSessionDeletedCallback = quiche::SingleUseCallback<void()>;
37 // If |error_message| is nullopt, the ANNOUNCE was successful.
38 using MoqtOutgoingAnnounceCallback = quiche::SingleUseCallback<void(
39 absl::string_view track_namespace,
40 std::optional<MoqtAnnounceErrorReason> error)>;
41 using MoqtIncomingAnnounceCallback =
42 quiche::MultiUseCallback<std::optional<MoqtAnnounceErrorReason>(
43 absl::string_view track_namespace)>;
44
DefaultIncomingAnnounceCallback(absl::string_view)45 inline std::optional<MoqtAnnounceErrorReason> DefaultIncomingAnnounceCallback(
46 absl::string_view /*track_namespace*/) {
47 return std::optional(MoqtAnnounceErrorReason{
48 MoqtAnnounceErrorCode::kAnnounceNotSupported,
49 "This endpoint does not accept incoming ANNOUNCE messages"});
50 };
51
52 // Callbacks for session-level events.
53 struct MoqtSessionCallbacks {
54 MoqtSessionEstablishedCallback session_established_callback = +[] {};
55 MoqtSessionTerminatedCallback session_terminated_callback =
56 +[](absl::string_view) {};
57 MoqtSessionDeletedCallback session_deleted_callback = +[] {};
58
59 MoqtIncomingAnnounceCallback incoming_announce_callback =
60 DefaultIncomingAnnounceCallback;
61 };
62
63 class QUICHE_EXPORT MoqtSession : public webtransport::SessionVisitor {
64 public:
65 MoqtSession(webtransport::Session* session, MoqtSessionParameters parameters,
66 MoqtSessionCallbacks callbacks = MoqtSessionCallbacks())
session_(session)67 : session_(session),
68 parameters_(parameters),
69 callbacks_(std::move(callbacks)),
70 framer_(quiche::SimpleBufferAllocator::Get(),
71 parameters.using_webtrans) {}
~MoqtSession()72 ~MoqtSession() { std::move(callbacks_.session_deleted_callback)(); }
73
74 // webtransport::SessionVisitor implementation.
75 void OnSessionReady() override;
76 void OnSessionClosed(webtransport::SessionErrorCode,
77 const std::string&) override;
78 void OnIncomingBidirectionalStreamAvailable() override;
79 void OnIncomingUnidirectionalStreamAvailable() override;
80 void OnDatagramReceived(absl::string_view datagram) override;
OnCanCreateNewOutgoingBidirectionalStream()81 void OnCanCreateNewOutgoingBidirectionalStream() override {}
OnCanCreateNewOutgoingUnidirectionalStream()82 void OnCanCreateNewOutgoingUnidirectionalStream() override {}
83
84 void Error(MoqtError code, absl::string_view error);
85
perspective()86 quic::Perspective perspective() const { return parameters_.perspective; }
87
88 // Add to the list of tracks that can be subscribed to. Call this before
89 // Announce() so that subscriptions can be processed correctly. If |visitor|
90 // is nullptr, then incoming SUBSCRIBE for objects in the path will receive
91 // SUBSCRIBE_OK, but never actually get the objects.
92 void AddLocalTrack(const FullTrackName& full_track_name,
93 MoqtForwardingPreference forwarding_preference,
94 LocalTrack::Visitor* visitor);
95 // Send an ANNOUNCE message for |track_namespace|, and call
96 // |announce_callback| when the response arrives. Will fail immediately if
97 // there is already an unresolved ANNOUNCE for that namespace.
98 void Announce(absl::string_view track_namespace,
99 MoqtOutgoingAnnounceCallback announce_callback);
100 bool HasSubscribers(const FullTrackName& full_track_name) const;
101
102 // Returns true if SUBSCRIBE was sent. If there is already a subscription to
103 // the track, the message will still be sent. However, the visitor will be
104 // ignored.
105 bool SubscribeAbsolute(absl::string_view track_namespace,
106 absl::string_view name, uint64_t start_group,
107 uint64_t start_object, RemoteTrack::Visitor* visitor,
108 absl::string_view auth_info = "");
109 bool SubscribeAbsolute(absl::string_view track_namespace,
110 absl::string_view name, uint64_t start_group,
111 uint64_t start_object, uint64_t end_group,
112 uint64_t end_object, RemoteTrack::Visitor* visitor,
113 absl::string_view auth_info = "");
114 bool SubscribeRelative(absl::string_view track_namespace,
115 absl::string_view name, int64_t start_group,
116 int64_t start_object, RemoteTrack::Visitor* visitor,
117 absl::string_view auth_info = "");
118 bool SubscribeCurrentGroup(absl::string_view track_namespace,
119 absl::string_view name,
120 RemoteTrack::Visitor* visitor,
121 absl::string_view auth_info = "");
122
123 // Returns false if it could not open a stream when necessary, or if the
124 // track does not exist (there was no call to AddLocalTrack). Will still
125 // return false is some streams succeed.
126 // Also returns false if |payload_length| exists but is shorter than
127 // |payload|.
128 // |payload.length() >= |payload_length|, because the application can deliver
129 // partial objects.
130 bool PublishObject(const FullTrackName& full_track_name, uint64_t group_id,
131 uint64_t object_id, uint64_t object_send_order,
132 absl::string_view payload, bool end_of_stream);
133 // TODO: Add an API to FIN the stream for a particular track/group/object.
134 // TODO: Add an API to send partial objects.
135
callbacks()136 MoqtSessionCallbacks& callbacks() { return callbacks_; }
137
138 private:
139 friend class test::MoqtSessionPeer;
140 class QUICHE_EXPORT Stream : public webtransport::StreamVisitor,
141 public MoqtParserVisitor {
142 public:
Stream(MoqtSession * session,webtransport::Stream * stream)143 Stream(MoqtSession* session, webtransport::Stream* stream)
144 : session_(session),
145 stream_(stream),
146 parser_(session->parameters_.using_webtrans, *this) {}
Stream(MoqtSession * session,webtransport::Stream * stream,bool is_control_stream)147 Stream(MoqtSession* session, webtransport::Stream* stream,
148 bool is_control_stream)
149 : session_(session),
150 stream_(stream),
151 parser_(session->parameters_.using_webtrans, *this),
152 is_control_stream_(is_control_stream) {}
153
154 // webtransport::StreamVisitor implementation.
155 void OnCanRead() override;
156 void OnCanWrite() override;
157 void OnResetStreamReceived(webtransport::StreamErrorCode error) override;
158 void OnStopSendingReceived(webtransport::StreamErrorCode error) override;
OnWriteSideInDataRecvdState()159 void OnWriteSideInDataRecvdState() override {}
160
161 // MoqtParserVisitor implementation.
162 // TODO: Handle a stream FIN.
163 void OnObjectMessage(const MoqtObject& message, absl::string_view payload,
164 bool end_of_message) override;
165 void OnClientSetupMessage(const MoqtClientSetup& message) override;
166 void OnServerSetupMessage(const MoqtServerSetup& message) override;
167 void OnSubscribeMessage(const MoqtSubscribe& message) override;
168 void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override;
169 void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override;
170 void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override;
OnSubscribeDoneMessage(const MoqtSubscribeDone &)171 void OnSubscribeDoneMessage(const MoqtSubscribeDone& /*message*/) override {
172 }
173 void OnAnnounceMessage(const MoqtAnnounce& message) override;
174 void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override;
175 void OnAnnounceErrorMessage(const MoqtAnnounceError& message) override;
OnUnannounceMessage(const MoqtUnannounce &)176 void OnUnannounceMessage(const MoqtUnannounce& /*message*/) override {}
OnGoAwayMessage(const MoqtGoAway &)177 void OnGoAwayMessage(const MoqtGoAway& /*message*/) override {}
178 void OnParsingError(MoqtError error_code,
179 absl::string_view reason) override;
180
perspective()181 quic::Perspective perspective() const {
182 return session_->parameters_.perspective;
183 }
184
stream()185 webtransport::Stream* stream() const { return stream_; }
186
187 // Sends a control message, or buffers it if there is insufficient flow
188 // control credit.
189 void SendOrBufferMessage(quiche::QuicheBuffer message, bool fin = false);
190
191 private:
192 friend class test::MoqtSessionPeer;
193 void SendSubscribeError(const MoqtSubscribe& message,
194 SubscribeErrorCode error_code,
195 absl::string_view reason_phrase,
196 uint64_t track_alias);
197 bool CheckIfIsControlStream();
198
199 MoqtSession* session_;
200 webtransport::Stream* stream_;
201 MoqtParser parser_;
202 // nullopt means "incoming stream, and we don't know if it's the control
203 // stream or a data stream yet".
204 std::optional<bool> is_control_stream_;
205 std::string partial_object_;
206 };
207
208 // Returns the pointer to the control stream, or nullptr if none is present.
209 Stream* GetControlStream();
210 // Sends a message on the control stream; QUICHE_DCHECKs if no control stream
211 // is present.
212 void SendControlMessage(quiche::QuicheBuffer message);
213
214 // Returns false if the SUBSCRIBE isn't sent.
215 bool Subscribe(MoqtSubscribe& message, RemoteTrack::Visitor* visitor);
216 // converts two MoqtLocations into absolute sequences.
217 std::optional<FullSequence> LocationToAbsoluteNumber(
218 const LocalTrack& track,
219 const std::optional<MoqtSubscribeLocation>& group,
220 const std::optional<MoqtSubscribeLocation>& object);
221 // Returns the stream ID if successful, nullopt if not.
222 // TODO: Add a callback if stream creation is delayed.
223 std::optional<webtransport::StreamId> OpenUnidirectionalStream();
224
225 // Get FullTrackName and visitor for a subscribe_id and track_alias. Returns
226 // nullptr if not present.
227 std::pair<FullTrackName, RemoteTrack::Visitor*> TrackPropertiesFromAlias(
228 const MoqtObject& message);
229
230 webtransport::Session* session_;
231 MoqtSessionParameters parameters_;
232 MoqtSessionCallbacks callbacks_;
233 MoqtFramer framer_;
234
235 std::optional<webtransport::StreamId> control_stream_;
236 std::string error_;
237
238 // All the tracks the session is subscribed to, indexed by track_alias.
239 // Multiple subscribes to the same track are recorded in a single
240 // subscription.
241 absl::flat_hash_map<uint64_t, RemoteTrack> remote_tracks_;
242 // Look up aliases for remote tracks by name
243 absl::flat_hash_map<FullTrackName, uint64_t> remote_track_aliases_;
244 uint64_t next_remote_track_alias_ = 0;
245
246 // All the tracks the peer can subscribe to.
247 absl::flat_hash_map<FullTrackName, LocalTrack> local_tracks_;
248 // This is only used to check for track_alias collisions.
249 absl::flat_hash_set<uint64_t> used_track_aliases_;
250 uint64_t next_local_track_alias_ = 0;
251
252 // Indexed by subscribe_id.
253 struct ActiveSubscribe {
254 MoqtSubscribe message;
255 RemoteTrack::Visitor* visitor;
256 // The forwarding preference of the first received object, which all
257 // subsequent objects must match.
258 std::optional<MoqtForwardingPreference> forwarding_preference;
259 // If true, an object has arrived for the subscription before SUBSCRIBE_OK
260 // arrived.
261 bool received_object = false;
262 };
263 // Outgoing SUBSCRIBEs that have not received SUBSCRIBE_OK or SUBSCRIBE_ERROR.
264 absl::flat_hash_map<uint64_t, ActiveSubscribe> active_subscribes_;
265 uint64_t next_subscribe_id_ = 0;
266
267 // Indexed by track namespace.
268 absl::flat_hash_map<std::string, MoqtOutgoingAnnounceCallback>
269 pending_outgoing_announces_;
270
271 // The role the peer advertised in its SETUP message. Initialize it to avoid
272 // an uninitialized value if no SETUP arrives or it arrives with no Role
273 // parameter, and other checks have changed/been disabled.
274 MoqtRole peer_role_ = MoqtRole::kPubSub;
275 };
276
277 } // namespace moqt
278
279 #endif // QUICHE_QUIC_MOQT_MOQT_SESSION_H_
280