xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/moqt/moqt_parser.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright (c) 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/moqt/moqt_parser.h"
6 
7 #include <cstddef>
8 #include <cstdint>
9 #include <cstring>
10 #include <optional>
11 #include <string>
12 
13 #include "absl/cleanup/cleanup.h"
14 #include "absl/strings/str_cat.h"
15 #include "absl/strings/string_view.h"
16 #include "quiche/quic/core/quic_data_reader.h"
17 #include "quiche/quic/core/quic_time.h"
18 #include "quiche/quic/moqt/moqt_messages.h"
19 #include "quiche/common/platform/api/quiche_logging.h"
20 
21 namespace moqt {
22 
23 // The buffering philosophy is complicated, to minimize copying. Here is an
24 // overview:
25 // If the entire message body is present (except for OBJECT payload), it is
26 // parsed and delivered. If not, the partial body is buffered. (requiring a
27 // copy).
28 // Any OBJECT payload is always delivered to the application without copying.
29 // If something has been buffered, when more data arrives copy just enough of it
30 // to finish parsing that thing, then resume normal processing.
ProcessData(absl::string_view data,bool fin)31 void MoqtParser::ProcessData(absl::string_view data, bool fin) {
32   if (no_more_data_) {
33     ParseError("Data after end of stream");
34   }
35   if (processing_) {
36     return;
37   }
38   processing_ = true;
39   auto on_return = absl::MakeCleanup([&] { processing_ = false; });
40   // Check for early fin
41   if (fin) {
42     no_more_data_ = true;
43     if (ObjectPayloadInProgress() &&
44         payload_length_remaining_ > data.length()) {
45       ParseError("End of stream before complete OBJECT PAYLOAD");
46       return;
47     }
48     if (!buffered_message_.empty() && data.empty()) {
49       ParseError("End of stream before complete message");
50       return;
51     }
52   }
53   std::optional<quic::QuicDataReader> reader = std::nullopt;
54   size_t original_buffer_size = buffered_message_.size();
55   // There are three cases: the parser has already delivered an OBJECT header
56   // and is now delivering payload; part of a message is in the buffer; or
57   // no message is in progress.
58   if (ObjectPayloadInProgress()) {
59     // This is additional payload for an OBJECT.
60     QUICHE_DCHECK(buffered_message_.empty());
61     if (!object_metadata_->payload_length.has_value()) {
62       // Deliver the data and exit.
63       visitor_.OnObjectMessage(*object_metadata_, data, fin);
64       if (fin) {
65         object_metadata_.reset();
66       }
67       return;
68     }
69     if (data.length() < payload_length_remaining_) {
70       // Does not finish the payload; deliver and exit.
71       visitor_.OnObjectMessage(*object_metadata_, data, false);
72       payload_length_remaining_ -= data.length();
73       return;
74     }
75     // Finishes the payload. Deliver and continue.
76     reader.emplace(data);
77     visitor_.OnObjectMessage(*object_metadata_,
78                              data.substr(0, payload_length_remaining_), true);
79     reader->Seek(payload_length_remaining_);
80     payload_length_remaining_ = 0;  // Expect a new object.
81   } else if (!buffered_message_.empty()) {
82     absl::StrAppend(&buffered_message_, data);
83     reader.emplace(buffered_message_);
84   } else {
85     // No message in progress.
86     reader.emplace(data);
87   }
88   size_t total_processed = 0;
89   while (!reader->IsDoneReading()) {
90     size_t message_len = ProcessMessage(reader->PeekRemainingPayload(), fin);
91     if (message_len == 0) {
92       if (reader->BytesRemaining() > kMaxMessageHeaderSize) {
93         ParseError(MoqtError::kInternalError,
94                    "Cannot parse non-OBJECT messages > 2KB");
95         return;
96       }
97       if (fin) {
98         ParseError("FIN after incomplete message");
99         return;
100       }
101       if (buffered_message_.empty()) {
102         // If the buffer is not empty, |data| has already been copied there.
103         absl::StrAppend(&buffered_message_, reader->PeekRemainingPayload());
104       }
105       break;
106     }
107     // A message was successfully processed.
108     total_processed += message_len;
109     reader->Seek(message_len);
110   }
111   if (original_buffer_size > 0) {
112     buffered_message_.erase(0, total_processed);
113   }
114 }
115 
116 // static
ProcessDatagram(absl::string_view data,MoqtObject & object_metadata)117 absl::string_view MoqtParser::ProcessDatagram(absl::string_view data,
118                                               MoqtObject& object_metadata) {
119   uint64_t value;
120   quic::QuicDataReader reader(data);
121   if (!reader.ReadVarInt62(&value)) {
122     return absl::string_view();
123   }
124   if (static_cast<MoqtMessageType>(value) != MoqtMessageType::kObjectDatagram) {
125     return absl::string_view();
126   }
127   size_t processed_data = ParseObjectHeader(reader, object_metadata,
128                                             MoqtMessageType::kObjectDatagram);
129   if (processed_data == 0) {  // Incomplete header
130     return absl::string_view();
131   }
132   return reader.PeekRemainingPayload();
133 }
134 
ProcessMessage(absl::string_view data,bool fin)135 size_t MoqtParser::ProcessMessage(absl::string_view data, bool fin) {
136   uint64_t value;
137   quic::QuicDataReader reader(data);
138   if (ObjectStreamInitialized() && !ObjectPayloadInProgress()) {
139     // This is a follow-on object in a stream.
140     return ProcessObject(reader,
141                          GetMessageTypeForForwardingPreference(
142                              object_metadata_->forwarding_preference),
143                          fin);
144   }
145   if (!reader.ReadVarInt62(&value)) {
146     return 0;
147   }
148   auto type = static_cast<MoqtMessageType>(value);
149   switch (type) {
150     case MoqtMessageType::kObjectDatagram:
151       ParseError("Received OBJECT_DATAGRAM on stream");
152       return 0;
153     case MoqtMessageType::kObjectStream:
154     case MoqtMessageType::kStreamHeaderTrack:
155     case MoqtMessageType::kStreamHeaderGroup:
156       return ProcessObject(reader, type, fin);
157     case MoqtMessageType::kClientSetup:
158       return ProcessClientSetup(reader);
159     case MoqtMessageType::kServerSetup:
160       return ProcessServerSetup(reader);
161     case MoqtMessageType::kSubscribe:
162       return ProcessSubscribe(reader);
163     case MoqtMessageType::kSubscribeOk:
164       return ProcessSubscribeOk(reader);
165     case MoqtMessageType::kSubscribeError:
166       return ProcessSubscribeError(reader);
167     case MoqtMessageType::kUnsubscribe:
168       return ProcessUnsubscribe(reader);
169     case MoqtMessageType::kSubscribeDone:
170       return ProcessSubscribeDone(reader);
171     case MoqtMessageType::kAnnounce:
172       return ProcessAnnounce(reader);
173     case MoqtMessageType::kAnnounceOk:
174       return ProcessAnnounceOk(reader);
175     case MoqtMessageType::kAnnounceError:
176       return ProcessAnnounceError(reader);
177     case MoqtMessageType::kUnannounce:
178       return ProcessUnannounce(reader);
179     case MoqtMessageType::kGoAway:
180       return ProcessGoAway(reader);
181     default:
182       ParseError("Unknown message type");
183       return 0;
184   }
185 }
186 
ProcessObject(quic::QuicDataReader & reader,MoqtMessageType type,bool fin)187 size_t MoqtParser::ProcessObject(quic::QuicDataReader& reader,
188                                  MoqtMessageType type, bool fin) {
189   size_t processed_data = 0;
190   QUICHE_DCHECK(!ObjectPayloadInProgress());
191   if (!ObjectStreamInitialized()) {
192     object_metadata_ = MoqtObject();
193     processed_data = ParseObjectHeader(reader, object_metadata_.value(), type);
194     if (processed_data == 0) {
195       object_metadata_.reset();
196       return 0;
197     }
198   }
199   // At this point, enough data has been processed to store in object_metadata_,
200   // even if there's nothing else in the buffer.
201   QUICHE_DCHECK(payload_length_remaining_ == 0);
202   switch (type) {
203     case MoqtMessageType::kStreamHeaderTrack:
204       if (!reader.ReadVarInt62(&object_metadata_->group_id)) {
205         return processed_data;
206       }
207       [[fallthrough]];
208     case MoqtMessageType::kStreamHeaderGroup: {
209       uint64_t length;
210       if (!reader.ReadVarInt62(&object_metadata_->object_id) ||
211           !reader.ReadVarInt62(&length)) {
212         return processed_data;
213       }
214       object_metadata_->payload_length = length;
215       break;
216     }
217     default:
218       break;
219   }
220   bool has_length = object_metadata_->payload_length.has_value();
221   bool received_complete_message = false;
222   size_t payload_to_draw = reader.BytesRemaining();
223   if (fin && has_length &&
224       *object_metadata_->payload_length > reader.BytesRemaining()) {
225     ParseError("Received FIN mid-payload");
226     return processed_data;
227   }
228   received_complete_message =
229       fin || (has_length &&
230               *object_metadata_->payload_length <= reader.BytesRemaining());
231   if (received_complete_message && has_length &&
232       *object_metadata_->payload_length < reader.BytesRemaining()) {
233     payload_to_draw = *object_metadata_->payload_length;
234   }
235   // The error case where there's a fin before the explicit length is complete
236   // is handled in ProcessData() in two separate places. Even though the
237   // message is "done" if fin regardless of has_length, it's bad to report to
238   // the application that the object is done if it hasn't reached the promised
239   // length.
240   visitor_.OnObjectMessage(
241       *object_metadata_,
242       reader.PeekRemainingPayload().substr(0, payload_to_draw),
243       received_complete_message);
244   reader.Seek(payload_to_draw);
245   payload_length_remaining_ =
246       has_length ? *object_metadata_->payload_length - payload_to_draw : 0;
247   return reader.PreviouslyReadPayload().length();
248 }
249 
ProcessClientSetup(quic::QuicDataReader & reader)250 size_t MoqtParser::ProcessClientSetup(quic::QuicDataReader& reader) {
251   MoqtClientSetup setup;
252   uint64_t number_of_supported_versions;
253   if (!reader.ReadVarInt62(&number_of_supported_versions)) {
254     return 0;
255   }
256   uint64_t version;
257   for (uint64_t i = 0; i < number_of_supported_versions; ++i) {
258     if (!reader.ReadVarInt62(&version)) {
259       return 0;
260     }
261     setup.supported_versions.push_back(static_cast<MoqtVersion>(version));
262   }
263   uint64_t num_params;
264   if (!reader.ReadVarInt62(&num_params)) {
265     return 0;
266   }
267   // Parse parameters
268   for (uint64_t i = 0; i < num_params; ++i) {
269     uint64_t type;
270     absl::string_view value;
271     if (!ReadParameter(reader, type, value)) {
272       return 0;
273     }
274     auto key = static_cast<MoqtSetupParameter>(type);
275     switch (key) {
276       case MoqtSetupParameter::kRole:
277         if (setup.role.has_value()) {
278           ParseError("ROLE parameter appears twice in SETUP");
279           return 0;
280         }
281         uint64_t index;
282         if (!StringViewToVarInt(value, index)) {
283           return 0;
284         }
285         if (index > static_cast<uint64_t>(MoqtRole::kRoleMax)) {
286           ParseError("Invalid ROLE parameter");
287           return 0;
288         }
289         setup.role = static_cast<MoqtRole>(index);
290         break;
291       case MoqtSetupParameter::kPath:
292         if (uses_web_transport_) {
293           ParseError(
294               "WebTransport connection is using PATH parameter in SETUP");
295           return 0;
296         }
297         if (setup.path.has_value()) {
298           ParseError("PATH parameter appears twice in CLIENT_SETUP");
299           return 0;
300         }
301         setup.path = value;
302         break;
303       default:
304         // Skip over the parameter.
305         break;
306     }
307   }
308   if (!setup.role.has_value()) {
309     ParseError("ROLE parameter missing from CLIENT_SETUP message");
310     return 0;
311   }
312   if (!uses_web_transport_ && !setup.path.has_value()) {
313     ParseError("PATH SETUP parameter missing from Client message over QUIC");
314     return 0;
315   }
316   visitor_.OnClientSetupMessage(setup);
317   return reader.PreviouslyReadPayload().length();
318 }
319 
ProcessServerSetup(quic::QuicDataReader & reader)320 size_t MoqtParser::ProcessServerSetup(quic::QuicDataReader& reader) {
321   MoqtServerSetup setup;
322   uint64_t version;
323   if (!reader.ReadVarInt62(&version)) {
324     return 0;
325   }
326   setup.selected_version = static_cast<MoqtVersion>(version);
327   uint64_t num_params;
328   if (!reader.ReadVarInt62(&num_params)) {
329     return 0;
330   }
331   // Parse parameters
332   for (uint64_t i = 0; i < num_params; ++i) {
333     uint64_t type;
334     absl::string_view value;
335     if (!ReadParameter(reader, type, value)) {
336       return 0;
337     }
338     auto key = static_cast<MoqtSetupParameter>(type);
339     switch (key) {
340       case MoqtSetupParameter::kRole:
341         if (setup.role.has_value()) {
342           ParseError("ROLE parameter appears twice in SETUP");
343           return 0;
344         }
345         uint64_t index;
346         if (!StringViewToVarInt(value, index)) {
347           return 0;
348         }
349         if (index > static_cast<uint64_t>(MoqtRole::kRoleMax)) {
350           ParseError("Invalid ROLE parameter");
351           return 0;
352         }
353         setup.role = static_cast<MoqtRole>(index);
354         break;
355       case MoqtSetupParameter::kPath:
356         ParseError("PATH parameter in SERVER_SETUP");
357         return 0;
358       default:
359         // Skip over the parameter.
360         break;
361     }
362   }
363   if (!setup.role.has_value()) {
364     ParseError("ROLE parameter missing from SERVER_SETUP message");
365     return 0;
366   }
367   visitor_.OnServerSetupMessage(setup);
368   return reader.PreviouslyReadPayload().length();
369 }
370 
ProcessSubscribe(quic::QuicDataReader & reader)371 size_t MoqtParser::ProcessSubscribe(quic::QuicDataReader& reader) {
372   MoqtSubscribe subscribe_request;
373   if (!reader.ReadVarInt62(&subscribe_request.subscribe_id) ||
374       !reader.ReadVarInt62(&subscribe_request.track_alias) ||
375       !reader.ReadStringVarInt62(subscribe_request.track_namespace) ||
376       !reader.ReadStringVarInt62(subscribe_request.track_name) ||
377       !ReadLocation(reader, subscribe_request.start_group)) {
378     return 0;
379   }
380   if (!subscribe_request.start_group.has_value()) {
381     ParseError("START_GROUP must not be None in SUBSCRIBE");
382     return 0;
383   }
384   if (!ReadLocation(reader, subscribe_request.start_object)) {
385     return 0;
386   }
387   if (!subscribe_request.start_object.has_value()) {
388     ParseError("START_OBJECT must not be None in SUBSCRIBE");
389     return 0;
390   }
391   if (!ReadLocation(reader, subscribe_request.end_group) ||
392       !ReadLocation(reader, subscribe_request.end_object)) {
393     return 0;
394   }
395   if (subscribe_request.end_group.has_value() !=
396       subscribe_request.end_object.has_value()) {
397     ParseError(
398         "SUBSCRIBE end_group and end_object must be both None "
399         "or both non_None");
400     return 0;
401   }
402   uint64_t num_params;
403   if (!reader.ReadVarInt62(&num_params)) {
404     return 0;
405   }
406   for (uint64_t i = 0; i < num_params; ++i) {
407     uint64_t type;
408     absl::string_view value;
409     if (!ReadParameter(reader, type, value)) {
410       return 0;
411     }
412     auto key = static_cast<MoqtTrackRequestParameter>(type);
413     switch (key) {
414       case MoqtTrackRequestParameter::kAuthorizationInfo:
415         if (subscribe_request.authorization_info.has_value()) {
416           ParseError(
417               "AUTHORIZATION_INFO parameter appears twice in "
418               "SUBSCRIBE_REQUEST");
419           return 0;
420         }
421         subscribe_request.authorization_info = value;
422         break;
423       default:
424         // Skip over the parameter.
425         break;
426     }
427   }
428   visitor_.OnSubscribeMessage(subscribe_request);
429   return reader.PreviouslyReadPayload().length();
430 }
431 
ProcessSubscribeOk(quic::QuicDataReader & reader)432 size_t MoqtParser::ProcessSubscribeOk(quic::QuicDataReader& reader) {
433   MoqtSubscribeOk subscribe_ok;
434   uint64_t milliseconds;
435   uint8_t content_exists;
436   if (!reader.ReadVarInt62(&subscribe_ok.subscribe_id) ||
437       !reader.ReadVarInt62(&milliseconds) ||
438       !reader.ReadUInt8(&content_exists)) {
439     return 0;
440   }
441   if (content_exists > 1) {
442     ParseError("SUBSCRIBE_OK ContentExists has invalid value");
443     return 0;
444   }
445   subscribe_ok.expires = quic::QuicTimeDelta::FromMilliseconds(milliseconds);
446   if (content_exists) {
447     subscribe_ok.largest_id = FullSequence();
448     if (!reader.ReadVarInt62(&subscribe_ok.largest_id->group) ||
449         !reader.ReadVarInt62(&subscribe_ok.largest_id->object)) {
450       return 0;
451     }
452   }
453   visitor_.OnSubscribeOkMessage(subscribe_ok);
454   return reader.PreviouslyReadPayload().length();
455 }
456 
ProcessSubscribeError(quic::QuicDataReader & reader)457 size_t MoqtParser::ProcessSubscribeError(quic::QuicDataReader& reader) {
458   MoqtSubscribeError subscribe_error;
459   uint64_t error_code;
460   if (!reader.ReadVarInt62(&subscribe_error.subscribe_id) ||
461       !reader.ReadVarInt62(&error_code) ||
462       !reader.ReadStringVarInt62(subscribe_error.reason_phrase) ||
463       !reader.ReadVarInt62(&subscribe_error.track_alias)) {
464     return 0;
465   }
466   subscribe_error.error_code = static_cast<SubscribeErrorCode>(error_code);
467   visitor_.OnSubscribeErrorMessage(subscribe_error);
468   return reader.PreviouslyReadPayload().length();
469 }
470 
ProcessUnsubscribe(quic::QuicDataReader & reader)471 size_t MoqtParser::ProcessUnsubscribe(quic::QuicDataReader& reader) {
472   MoqtUnsubscribe unsubscribe;
473   if (!reader.ReadVarInt62(&unsubscribe.subscribe_id)) {
474     return 0;
475   }
476   visitor_.OnUnsubscribeMessage(unsubscribe);
477   return reader.PreviouslyReadPayload().length();
478 }
479 
ProcessSubscribeDone(quic::QuicDataReader & reader)480 size_t MoqtParser::ProcessSubscribeDone(quic::QuicDataReader& reader) {
481   MoqtSubscribeDone subscribe_done;
482   uint8_t content_exists;
483   if (!reader.ReadVarInt62(&subscribe_done.subscribe_id) ||
484       !reader.ReadVarInt62(&subscribe_done.status_code) ||
485       !reader.ReadStringVarInt62(subscribe_done.reason_phrase) ||
486       !reader.ReadUInt8(&content_exists)) {
487     return 0;
488   }
489   if (content_exists > 1) {
490     ParseError("SUBSCRIBE_DONE ContentExists has invalid value");
491     return 0;
492   }
493   if (content_exists == 1) {
494     subscribe_done.final_id = FullSequence();
495     if (!reader.ReadVarInt62(&subscribe_done.final_id->group) ||
496         !reader.ReadVarInt62(&subscribe_done.final_id->object)) {
497       return 0;
498     }
499   }
500   visitor_.OnSubscribeDoneMessage(subscribe_done);
501   return reader.PreviouslyReadPayload().length();
502 }
503 
ProcessAnnounce(quic::QuicDataReader & reader)504 size_t MoqtParser::ProcessAnnounce(quic::QuicDataReader& reader) {
505   MoqtAnnounce announce;
506   if (!reader.ReadStringVarInt62(announce.track_namespace)) {
507     return 0;
508   }
509   uint64_t num_params;
510   if (!reader.ReadVarInt62(&num_params)) {
511     return 0;
512   }
513   for (uint64_t i = 0; i < num_params; ++i) {
514     uint64_t type;
515     absl::string_view value;
516     if (!ReadParameter(reader, type, value)) {
517       return 0;
518     }
519     auto key = static_cast<MoqtTrackRequestParameter>(type);
520     switch (key) {
521       case MoqtTrackRequestParameter::kAuthorizationInfo:
522         if (announce.authorization_info.has_value()) {
523           ParseError("AUTHORIZATION_INFO parameter appears twice in ANNOUNCE");
524           return 0;
525         }
526         announce.authorization_info = value;
527         break;
528       default:
529         // Skip over the parameter.
530         break;
531     }
532   }
533   visitor_.OnAnnounceMessage(announce);
534   return reader.PreviouslyReadPayload().length();
535 }
536 
ProcessAnnounceOk(quic::QuicDataReader & reader)537 size_t MoqtParser::ProcessAnnounceOk(quic::QuicDataReader& reader) {
538   MoqtAnnounceOk announce_ok;
539   if (!reader.ReadStringVarInt62(announce_ok.track_namespace)) {
540     return 0;
541   }
542   visitor_.OnAnnounceOkMessage(announce_ok);
543   return reader.PreviouslyReadPayload().length();
544 }
545 
ProcessAnnounceError(quic::QuicDataReader & reader)546 size_t MoqtParser::ProcessAnnounceError(quic::QuicDataReader& reader) {
547   MoqtAnnounceError announce_error;
548   if (!reader.ReadStringVarInt62(announce_error.track_namespace)) {
549     return 0;
550   }
551   uint64_t error_code;
552   if (!reader.ReadVarInt62(&error_code)) {
553     return 0;
554   }
555   announce_error.error_code = static_cast<MoqtAnnounceErrorCode>(error_code);
556   if (!reader.ReadStringVarInt62(announce_error.reason_phrase)) {
557     return 0;
558   }
559   visitor_.OnAnnounceErrorMessage(announce_error);
560   return reader.PreviouslyReadPayload().length();
561 }
562 
ProcessUnannounce(quic::QuicDataReader & reader)563 size_t MoqtParser::ProcessUnannounce(quic::QuicDataReader& reader) {
564   MoqtUnannounce unannounce;
565   if (!reader.ReadStringVarInt62(unannounce.track_namespace)) {
566     return 0;
567   }
568   visitor_.OnUnannounceMessage(unannounce);
569   return reader.PreviouslyReadPayload().length();
570 }
571 
ProcessGoAway(quic::QuicDataReader & reader)572 size_t MoqtParser::ProcessGoAway(quic::QuicDataReader& reader) {
573   MoqtGoAway goaway;
574   if (!reader.ReadStringVarInt62(goaway.new_session_uri)) {
575     return 0;
576   }
577   visitor_.OnGoAwayMessage(goaway);
578   return reader.PreviouslyReadPayload().length();
579 }
580 
581 // static
ParseObjectHeader(quic::QuicDataReader & reader,MoqtObject & object,MoqtMessageType type)582 size_t MoqtParser::ParseObjectHeader(quic::QuicDataReader& reader,
583                                      MoqtObject& object, MoqtMessageType type) {
584   if (!reader.ReadVarInt62(&object.subscribe_id) ||
585       !reader.ReadVarInt62(&object.track_alias)) {
586     return 0;
587   }
588   if (type != MoqtMessageType::kStreamHeaderTrack &&
589       !reader.ReadVarInt62(&object.group_id)) {
590     return 0;
591   }
592   if (type != MoqtMessageType::kStreamHeaderTrack &&
593       type != MoqtMessageType::kStreamHeaderGroup &&
594       !reader.ReadVarInt62(&object.object_id)) {
595     return 0;
596   }
597   if (!reader.ReadVarInt62(&object.object_send_order)) {
598     return 0;
599   }
600   object.forwarding_preference = GetForwardingPreference(type);
601   return reader.PreviouslyReadPayload().length();
602 }
603 
ParseError(absl::string_view reason)604 void MoqtParser::ParseError(absl::string_view reason) {
605   ParseError(MoqtError::kProtocolViolation, reason);
606 }
607 
ParseError(MoqtError error_code,absl::string_view reason)608 void MoqtParser::ParseError(MoqtError error_code, absl::string_view reason) {
609   if (parsing_error_) {
610     return;  // Don't send multiple parse errors.
611   }
612   no_more_data_ = true;
613   parsing_error_ = true;
614   visitor_.OnParsingError(error_code, reason);
615 }
616 
ReadVarIntPieceVarInt62(quic::QuicDataReader & reader,uint64_t & result)617 bool MoqtParser::ReadVarIntPieceVarInt62(quic::QuicDataReader& reader,
618                                          uint64_t& result) {
619   uint64_t length;
620   if (!reader.ReadVarInt62(&length)) {
621     return false;
622   }
623   uint64_t actual_length = static_cast<uint64_t>(reader.PeekVarInt62Length());
624   if (length != actual_length) {
625     ParseError("Parameter VarInt has length field mismatch");
626     return false;
627   }
628   if (!reader.ReadVarInt62(&result)) {
629     return false;
630   }
631   return true;
632 }
633 
ReadLocation(quic::QuicDataReader & reader,std::optional<MoqtSubscribeLocation> & loc)634 bool MoqtParser::ReadLocation(quic::QuicDataReader& reader,
635                               std::optional<MoqtSubscribeLocation>& loc) {
636   uint64_t ui64;
637   if (!reader.ReadVarInt62(&ui64)) {
638     return false;
639   }
640   auto mode = static_cast<MoqtSubscribeLocationMode>(ui64);
641   if (mode == MoqtSubscribeLocationMode::kNone) {
642     loc = std::nullopt;
643     return true;
644   }
645   if (!reader.ReadVarInt62(&ui64)) {
646     return false;
647   }
648   switch (mode) {
649     case MoqtSubscribeLocationMode::kAbsolute:
650       loc = MoqtSubscribeLocation(true, ui64);
651       break;
652     case MoqtSubscribeLocationMode::kRelativePrevious:
653       loc = MoqtSubscribeLocation(false, -1 * static_cast<int64_t>(ui64));
654       break;
655     case MoqtSubscribeLocationMode::kRelativeNext:
656       loc = MoqtSubscribeLocation(false, static_cast<int64_t>(ui64) + 1);
657       break;
658     default:
659       ParseError("Unknown location mode");
660       return false;
661   }
662   return true;
663 }
664 
ReadParameter(quic::QuicDataReader & reader,uint64_t & type,absl::string_view & value)665 bool MoqtParser::ReadParameter(quic::QuicDataReader& reader, uint64_t& type,
666                                absl::string_view& value) {
667   if (!reader.ReadVarInt62(&type)) {
668     return false;
669   }
670   return reader.ReadStringPieceVarInt62(&value);
671 }
672 
StringViewToVarInt(absl::string_view & sv,uint64_t & vi)673 bool MoqtParser::StringViewToVarInt(absl::string_view& sv, uint64_t& vi) {
674   quic::QuicDataReader reader(sv);
675   if (static_cast<size_t>(reader.PeekVarInt62Length()) != sv.length()) {
676     ParseError(MoqtError::kParameterLengthMismatch,
677                "Parameter length does not match varint encoding");
678     return false;
679   }
680   reader.ReadVarInt62(&vi);
681   return true;
682 }
683 
684 }  // namespace moqt
685