xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/moqt/moqt_session.h (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef QUICHE_QUIC_MOQT_MOQT_SESSION_H_
6 #define QUICHE_QUIC_MOQT_MOQT_SESSION_H_
7 
8 #include <cstdint>
9 #include <optional>
10 #include <string>
11 #include <utility>
12 
13 #include "absl/container/flat_hash_map.h"
14 #include "absl/container/flat_hash_set.h"
15 #include "absl/strings/string_view.h"
16 #include "quiche/quic/core/quic_types.h"
17 #include "quiche/quic/moqt/moqt_framer.h"
18 #include "quiche/quic/moqt/moqt_messages.h"
19 #include "quiche/quic/moqt/moqt_parser.h"
20 #include "quiche/quic/moqt/moqt_track.h"
21 #include "quiche/common/platform/api/quiche_export.h"
22 #include "quiche/common/quiche_buffer_allocator.h"
23 #include "quiche/common/quiche_callbacks.h"
24 #include "quiche/common/simple_buffer_allocator.h"
25 #include "quiche/web_transport/web_transport.h"
26 
27 namespace moqt {
28 
29 namespace test {
30 class MoqtSessionPeer;
31 }
32 
33 using MoqtSessionEstablishedCallback = quiche::SingleUseCallback<void()>;
34 using MoqtSessionTerminatedCallback =
35     quiche::SingleUseCallback<void(absl::string_view error_message)>;
36 using MoqtSessionDeletedCallback = quiche::SingleUseCallback<void()>;
37 // If |error_message| is nullopt, the ANNOUNCE was successful.
38 using MoqtOutgoingAnnounceCallback = quiche::SingleUseCallback<void(
39     absl::string_view track_namespace,
40     std::optional<MoqtAnnounceErrorReason> error)>;
41 using MoqtIncomingAnnounceCallback =
42     quiche::MultiUseCallback<std::optional<MoqtAnnounceErrorReason>(
43         absl::string_view track_namespace)>;
44 
DefaultIncomingAnnounceCallback(absl::string_view)45 inline std::optional<MoqtAnnounceErrorReason> DefaultIncomingAnnounceCallback(
46     absl::string_view /*track_namespace*/) {
47   return std::optional(MoqtAnnounceErrorReason{
48       MoqtAnnounceErrorCode::kAnnounceNotSupported,
49       "This endpoint does not accept incoming ANNOUNCE messages"});
50 };
51 
52 // Callbacks for session-level events.
53 struct MoqtSessionCallbacks {
54   MoqtSessionEstablishedCallback session_established_callback = +[] {};
55   MoqtSessionTerminatedCallback session_terminated_callback =
56       +[](absl::string_view) {};
57   MoqtSessionDeletedCallback session_deleted_callback = +[] {};
58 
59   MoqtIncomingAnnounceCallback incoming_announce_callback =
60       DefaultIncomingAnnounceCallback;
61 };
62 
63 class QUICHE_EXPORT MoqtSession : public webtransport::SessionVisitor {
64  public:
65   MoqtSession(webtransport::Session* session, MoqtSessionParameters parameters,
66               MoqtSessionCallbacks callbacks = MoqtSessionCallbacks())
session_(session)67       : session_(session),
68         parameters_(parameters),
69         callbacks_(std::move(callbacks)),
70         framer_(quiche::SimpleBufferAllocator::Get(),
71                 parameters.using_webtrans) {}
~MoqtSession()72   ~MoqtSession() { std::move(callbacks_.session_deleted_callback)(); }
73 
74   // webtransport::SessionVisitor implementation.
75   void OnSessionReady() override;
76   void OnSessionClosed(webtransport::SessionErrorCode,
77                        const std::string&) override;
78   void OnIncomingBidirectionalStreamAvailable() override;
79   void OnIncomingUnidirectionalStreamAvailable() override;
80   void OnDatagramReceived(absl::string_view datagram) override;
OnCanCreateNewOutgoingBidirectionalStream()81   void OnCanCreateNewOutgoingBidirectionalStream() override {}
OnCanCreateNewOutgoingUnidirectionalStream()82   void OnCanCreateNewOutgoingUnidirectionalStream() override {}
83 
84   void Error(MoqtError code, absl::string_view error);
85 
perspective()86   quic::Perspective perspective() const { return parameters_.perspective; }
87 
88   // Add to the list of tracks that can be subscribed to. Call this before
89   // Announce() so that subscriptions can be processed correctly. If |visitor|
90   // is nullptr, then incoming SUBSCRIBE for objects in the path will receive
91   // SUBSCRIBE_OK, but never actually get the objects.
92   void AddLocalTrack(const FullTrackName& full_track_name,
93                      MoqtForwardingPreference forwarding_preference,
94                      LocalTrack::Visitor* visitor);
95   // Send an ANNOUNCE message for |track_namespace|, and call
96   // |announce_callback| when the response arrives. Will fail immediately if
97   // there is already an unresolved ANNOUNCE for that namespace.
98   void Announce(absl::string_view track_namespace,
99                 MoqtOutgoingAnnounceCallback announce_callback);
100   bool HasSubscribers(const FullTrackName& full_track_name) const;
101 
102   // Returns true if SUBSCRIBE was sent. If there is already a subscription to
103   // the track, the message will still be sent. However, the visitor will be
104   // ignored.
105   bool SubscribeAbsolute(absl::string_view track_namespace,
106                          absl::string_view name, uint64_t start_group,
107                          uint64_t start_object, RemoteTrack::Visitor* visitor,
108                          absl::string_view auth_info = "");
109   bool SubscribeAbsolute(absl::string_view track_namespace,
110                          absl::string_view name, uint64_t start_group,
111                          uint64_t start_object, uint64_t end_group,
112                          uint64_t end_object, RemoteTrack::Visitor* visitor,
113                          absl::string_view auth_info = "");
114   bool SubscribeRelative(absl::string_view track_namespace,
115                          absl::string_view name, int64_t start_group,
116                          int64_t start_object, RemoteTrack::Visitor* visitor,
117                          absl::string_view auth_info = "");
118   bool SubscribeCurrentGroup(absl::string_view track_namespace,
119                              absl::string_view name,
120                              RemoteTrack::Visitor* visitor,
121                              absl::string_view auth_info = "");
122 
123   // Returns false if it could not open a stream when necessary, or if the
124   // track does not exist (there was no call to AddLocalTrack). Will still
125   // return false is some streams succeed.
126   // Also returns false if |payload_length| exists but is shorter than
127   // |payload|.
128   // |payload.length() >= |payload_length|, because the application can deliver
129   // partial objects.
130   bool PublishObject(const FullTrackName& full_track_name, uint64_t group_id,
131                      uint64_t object_id, uint64_t object_send_order,
132                      absl::string_view payload, bool end_of_stream);
133   // TODO: Add an API to FIN the stream for a particular track/group/object.
134   // TODO: Add an API to send partial objects.
135 
callbacks()136   MoqtSessionCallbacks& callbacks() { return callbacks_; }
137 
138  private:
139   friend class test::MoqtSessionPeer;
140   class QUICHE_EXPORT Stream : public webtransport::StreamVisitor,
141                                public MoqtParserVisitor {
142    public:
Stream(MoqtSession * session,webtransport::Stream * stream)143     Stream(MoqtSession* session, webtransport::Stream* stream)
144         : session_(session),
145           stream_(stream),
146           parser_(session->parameters_.using_webtrans, *this) {}
Stream(MoqtSession * session,webtransport::Stream * stream,bool is_control_stream)147     Stream(MoqtSession* session, webtransport::Stream* stream,
148            bool is_control_stream)
149         : session_(session),
150           stream_(stream),
151           parser_(session->parameters_.using_webtrans, *this),
152           is_control_stream_(is_control_stream) {}
153 
154     // webtransport::StreamVisitor implementation.
155     void OnCanRead() override;
156     void OnCanWrite() override;
157     void OnResetStreamReceived(webtransport::StreamErrorCode error) override;
158     void OnStopSendingReceived(webtransport::StreamErrorCode error) override;
OnWriteSideInDataRecvdState()159     void OnWriteSideInDataRecvdState() override {}
160 
161     // MoqtParserVisitor implementation.
162     // TODO: Handle a stream FIN.
163     void OnObjectMessage(const MoqtObject& message, absl::string_view payload,
164                          bool end_of_message) override;
165     void OnClientSetupMessage(const MoqtClientSetup& message) override;
166     void OnServerSetupMessage(const MoqtServerSetup& message) override;
167     void OnSubscribeMessage(const MoqtSubscribe& message) override;
168     void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override;
169     void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override;
170     void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override;
OnSubscribeDoneMessage(const MoqtSubscribeDone &)171     void OnSubscribeDoneMessage(const MoqtSubscribeDone& /*message*/) override {
172     }
173     void OnAnnounceMessage(const MoqtAnnounce& message) override;
174     void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override;
175     void OnAnnounceErrorMessage(const MoqtAnnounceError& message) override;
OnUnannounceMessage(const MoqtUnannounce &)176     void OnUnannounceMessage(const MoqtUnannounce& /*message*/) override {}
OnGoAwayMessage(const MoqtGoAway &)177     void OnGoAwayMessage(const MoqtGoAway& /*message*/) override {}
178     void OnParsingError(MoqtError error_code,
179                         absl::string_view reason) override;
180 
perspective()181     quic::Perspective perspective() const {
182       return session_->parameters_.perspective;
183     }
184 
stream()185     webtransport::Stream* stream() const { return stream_; }
186 
187     // Sends a control message, or buffers it if there is insufficient flow
188     // control credit.
189     void SendOrBufferMessage(quiche::QuicheBuffer message, bool fin = false);
190 
191    private:
192     friend class test::MoqtSessionPeer;
193     void SendSubscribeError(const MoqtSubscribe& message,
194                             SubscribeErrorCode error_code,
195                             absl::string_view reason_phrase,
196                             uint64_t track_alias);
197     bool CheckIfIsControlStream();
198 
199     MoqtSession* session_;
200     webtransport::Stream* stream_;
201     MoqtParser parser_;
202     // nullopt means "incoming stream, and we don't know if it's the control
203     // stream or a data stream yet".
204     std::optional<bool> is_control_stream_;
205     std::string partial_object_;
206   };
207 
208   // Returns the pointer to the control stream, or nullptr if none is present.
209   Stream* GetControlStream();
210   // Sends a message on the control stream; QUICHE_DCHECKs if no control stream
211   // is present.
212   void SendControlMessage(quiche::QuicheBuffer message);
213 
214   // Returns false if the SUBSCRIBE isn't sent.
215   bool Subscribe(MoqtSubscribe& message, RemoteTrack::Visitor* visitor);
216   // converts two MoqtLocations into absolute sequences.
217   std::optional<FullSequence> LocationToAbsoluteNumber(
218       const LocalTrack& track,
219       const std::optional<MoqtSubscribeLocation>& group,
220       const std::optional<MoqtSubscribeLocation>& object);
221   // Returns the stream ID if successful, nullopt if not.
222   // TODO: Add a callback if stream creation is delayed.
223   std::optional<webtransport::StreamId> OpenUnidirectionalStream();
224 
225   // Get FullTrackName and visitor for a subscribe_id and track_alias. Returns
226   // nullptr if not present.
227   std::pair<FullTrackName, RemoteTrack::Visitor*> TrackPropertiesFromAlias(
228       const MoqtObject& message);
229 
230   webtransport::Session* session_;
231   MoqtSessionParameters parameters_;
232   MoqtSessionCallbacks callbacks_;
233   MoqtFramer framer_;
234 
235   std::optional<webtransport::StreamId> control_stream_;
236   std::string error_;
237 
238   // All the tracks the session is subscribed to, indexed by track_alias.
239   // Multiple subscribes to the same track are recorded in a single
240   // subscription.
241   absl::flat_hash_map<uint64_t, RemoteTrack> remote_tracks_;
242   // Look up aliases for remote tracks by name
243   absl::flat_hash_map<FullTrackName, uint64_t> remote_track_aliases_;
244   uint64_t next_remote_track_alias_ = 0;
245 
246   // All the tracks the peer can subscribe to.
247   absl::flat_hash_map<FullTrackName, LocalTrack> local_tracks_;
248   // This is only used to check for track_alias collisions.
249   absl::flat_hash_set<uint64_t> used_track_aliases_;
250   uint64_t next_local_track_alias_ = 0;
251 
252   // Indexed by subscribe_id.
253   struct ActiveSubscribe {
254     MoqtSubscribe message;
255     RemoteTrack::Visitor* visitor;
256     // The forwarding preference of the first received object, which all
257     // subsequent objects must match.
258     std::optional<MoqtForwardingPreference> forwarding_preference;
259     // If true, an object has arrived for the subscription before SUBSCRIBE_OK
260     // arrived.
261     bool received_object = false;
262   };
263   // Outgoing SUBSCRIBEs that have not received SUBSCRIBE_OK or SUBSCRIBE_ERROR.
264   absl::flat_hash_map<uint64_t, ActiveSubscribe> active_subscribes_;
265   uint64_t next_subscribe_id_ = 0;
266 
267   // Indexed by track namespace.
268   absl::flat_hash_map<std::string, MoqtOutgoingAnnounceCallback>
269       pending_outgoing_announces_;
270 
271   // The role the peer advertised in its SETUP message. Initialize it to avoid
272   // an uninitialized value if no SETUP arrives or it arrives with no Role
273   // parameter, and other checks have changed/been disabled.
274   MoqtRole peer_role_ = MoqtRole::kPubSub;
275 };
276 
277 }  // namespace moqt
278 
279 #endif  // QUICHE_QUIC_MOQT_MOQT_SESSION_H_
280