1 // Copyright 2023 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef QUICHE_QUIC_MOQT_MOQT_SUBSCRIPTION_H_ 6 #define QUICHE_QUIC_MOQT_MOQT_SUBSCRIPTION_H_ 7 8 #include <cstdint> 9 #include <optional> 10 #include <vector> 11 12 #include "absl/strings/string_view.h" 13 #include "quiche/quic/moqt/moqt_messages.h" 14 #include "quiche/quic/moqt/moqt_subscribe_windows.h" 15 16 namespace moqt { 17 18 // A track to which the peer might subscribe. 19 class LocalTrack { 20 public: 21 class Visitor { 22 public: 23 virtual ~Visitor() = default; 24 25 // Requests that application re-publish objects from {start_group, 26 // start_object} to the latest object. If the return value is nullopt, the 27 // subscribe is valid and the application will deliver the object and 28 // the session will send SUBSCRIBE_OK. If the return has a value, the value 29 // is the error message (the session will send SUBSCRIBE_ERROR). Via this 30 // API, the application decides if a partially fulfillable 31 // SUBSCRIBE results in an error or not. 32 virtual std::optional<absl::string_view> OnSubscribeForPast( 33 const SubscribeWindow& window) = 0; 34 }; 35 // |visitor| must not be nullptr. LocalTrack(const FullTrackName & full_track_name,MoqtForwardingPreference forwarding_preference,Visitor * visitor)36 LocalTrack(const FullTrackName& full_track_name, 37 MoqtForwardingPreference forwarding_preference, Visitor* visitor) 38 : full_track_name_(full_track_name), 39 forwarding_preference_(forwarding_preference), 40 windows_(forwarding_preference), 41 visitor_(visitor) {} 42 // Creates a LocalTrack that does not start at sequence (0,0) LocalTrack(const FullTrackName & full_track_name,MoqtForwardingPreference forwarding_preference,Visitor * visitor,FullSequence next_sequence)43 LocalTrack(const FullTrackName& full_track_name, 44 MoqtForwardingPreference forwarding_preference, Visitor* visitor, 45 FullSequence next_sequence) 46 : full_track_name_(full_track_name), 47 forwarding_preference_(forwarding_preference), 48 windows_(forwarding_preference), 49 next_sequence_(next_sequence), 50 visitor_(visitor) {} 51 full_track_name()52 const FullTrackName& full_track_name() const { return full_track_name_; } 53 track_alias()54 std::optional<uint64_t> track_alias() const { return track_alias_; } set_track_alias(uint64_t track_alias)55 void set_track_alias(uint64_t track_alias) { track_alias_ = track_alias; } 56 visitor()57 Visitor* visitor() { return visitor_; } 58 59 // Returns the subscribe windows that want the object defined by (|group|, 60 // |object|). ShouldSend(FullSequence sequence)61 std::vector<SubscribeWindow*> ShouldSend(FullSequence sequence) { 62 return windows_.SequenceIsSubscribed(sequence); 63 } 64 AddWindow(uint64_t subscribe_id,uint64_t start_group,uint64_t start_object)65 void AddWindow(uint64_t subscribe_id, uint64_t start_group, 66 uint64_t start_object) { 67 windows_.AddWindow(subscribe_id, start_group, start_object); 68 } 69 AddWindow(uint64_t subscribe_id,uint64_t start_group,uint64_t start_object,uint64_t end_group,uint64_t end_object)70 void AddWindow(uint64_t subscribe_id, uint64_t start_group, 71 uint64_t start_object, uint64_t end_group, 72 uint64_t end_object) { 73 windows_.AddWindow(subscribe_id, start_group, start_object, end_group, 74 end_object); 75 } 76 DeleteWindow(uint64_t subscribe_id)77 void DeleteWindow(uint64_t subscribe_id) { 78 windows_.RemoveWindow(subscribe_id); 79 } 80 81 // Returns the largest observed sequence, but increments the object sequence 82 // by one. next_sequence()83 const FullSequence& next_sequence() const { return next_sequence_; } 84 85 // Updates next_sequence_ if |sequence| is larger. SentSequence(FullSequence sequence)86 void SentSequence(FullSequence sequence) { 87 if (next_sequence_ <= sequence) { 88 next_sequence_ = {sequence.group, sequence.object + 1}; 89 } 90 } 91 HasSubscriber()92 bool HasSubscriber() const { return !windows_.IsEmpty(); } 93 GetWindow(uint64_t subscribe_id)94 SubscribeWindow* GetWindow(uint64_t subscribe_id) { 95 return windows_.GetWindow(subscribe_id); 96 } 97 forwarding_preference()98 MoqtForwardingPreference forwarding_preference() const { 99 return forwarding_preference_; 100 } 101 102 private: 103 // This only needs to track subscriptions to current and future objects; 104 // requests for objects in the past are forwarded to the application. 105 const FullTrackName full_track_name_; 106 // The forwarding preference for the track. 107 MoqtForwardingPreference forwarding_preference_; 108 // Let the first SUBSCRIBE determine the track alias. 109 std::optional<uint64_t> track_alias_; 110 // The sequence numbers from this track to which the peer is subscribed. 111 MoqtSubscribeWindows windows_; 112 // By recording the highest observed sequence number, MoQT can interpret 113 // relative sequence numbers in SUBSCRIBEs. 114 FullSequence next_sequence_ = {0, 0}; 115 Visitor* visitor_; 116 }; 117 118 // A track on the peer to which the session has subscribed. 119 class RemoteTrack { 120 public: 121 class Visitor { 122 public: 123 virtual ~Visitor() = default; 124 // Called when the session receives a response to the SUBSCRIBE, unless it's 125 // a SUBSCRIBE_ERROR with a new track_alias. In that case, the session will 126 // automatically retry. 127 virtual void OnReply( 128 const FullTrackName& full_track_name, 129 std::optional<absl::string_view> error_reason_phrase) = 0; 130 virtual void OnObjectFragment( 131 const FullTrackName& full_track_name, uint64_t group_sequence, 132 uint64_t object_sequence, uint64_t object_send_order, 133 MoqtForwardingPreference forwarding_preference, 134 absl::string_view object, bool end_of_message) = 0; 135 // TODO(martinduke): Add final sequence numbers 136 }; RemoteTrack(const FullTrackName & full_track_name,uint64_t track_alias,Visitor * visitor)137 RemoteTrack(const FullTrackName& full_track_name, uint64_t track_alias, 138 Visitor* visitor) 139 : full_track_name_(full_track_name), 140 track_alias_(track_alias), 141 visitor_(visitor) {} 142 full_track_name()143 const FullTrackName& full_track_name() { return full_track_name_; } 144 track_alias()145 uint64_t track_alias() const { return track_alias_; } 146 visitor()147 Visitor* visitor() { return visitor_; } 148 149 // When called while processing the first object in the track, sets the 150 // forwarding preference to the value indicated by the incoming encoding. 151 // Otherwise, returns true if the incoming object does not violate the rule 152 // that the preference is consistent. CheckForwardingPreference(MoqtForwardingPreference preference)153 bool CheckForwardingPreference(MoqtForwardingPreference preference) { 154 if (forwarding_preference_.has_value()) { 155 return forwarding_preference_.value() == preference; 156 } 157 forwarding_preference_ = preference; 158 return true; 159 } 160 161 private: 162 // TODO: There is no accounting for the number of outstanding subscribes, 163 // because we can't match track names to individual subscribes. 164 const FullTrackName full_track_name_; 165 const uint64_t track_alias_; 166 Visitor* visitor_; 167 std::optional<MoqtForwardingPreference> forwarding_preference_; 168 }; 169 170 } // namespace moqt 171 172 #endif // QUICHE_QUIC_MOQT_MOQT_SUBSCRIPTION_H_ 173