1 // Copyright (c) 2023 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/moqt/moqt_parser.h"
6
7 #include <cstddef>
8 #include <cstdint>
9 #include <cstring>
10 #include <optional>
11 #include <string>
12
13 #include "absl/cleanup/cleanup.h"
14 #include "absl/strings/str_cat.h"
15 #include "absl/strings/string_view.h"
16 #include "quiche/quic/core/quic_data_reader.h"
17 #include "quiche/quic/core/quic_time.h"
18 #include "quiche/quic/moqt/moqt_messages.h"
19 #include "quiche/common/platform/api/quiche_logging.h"
20
21 namespace moqt {
22
23 // The buffering philosophy is complicated, to minimize copying. Here is an
24 // overview:
25 // If the entire message body is present (except for OBJECT payload), it is
26 // parsed and delivered. If not, the partial body is buffered. (requiring a
27 // copy).
28 // Any OBJECT payload is always delivered to the application without copying.
29 // If something has been buffered, when more data arrives copy just enough of it
30 // to finish parsing that thing, then resume normal processing.
ProcessData(absl::string_view data,bool fin)31 void MoqtParser::ProcessData(absl::string_view data, bool fin) {
32 if (no_more_data_) {
33 ParseError("Data after end of stream");
34 }
35 if (processing_) {
36 return;
37 }
38 processing_ = true;
39 auto on_return = absl::MakeCleanup([&] { processing_ = false; });
40 // Check for early fin
41 if (fin) {
42 no_more_data_ = true;
43 if (ObjectPayloadInProgress() &&
44 payload_length_remaining_ > data.length()) {
45 ParseError("End of stream before complete OBJECT PAYLOAD");
46 return;
47 }
48 if (!buffered_message_.empty() && data.empty()) {
49 ParseError("End of stream before complete message");
50 return;
51 }
52 }
53 std::optional<quic::QuicDataReader> reader = std::nullopt;
54 size_t original_buffer_size = buffered_message_.size();
55 // There are three cases: the parser has already delivered an OBJECT header
56 // and is now delivering payload; part of a message is in the buffer; or
57 // no message is in progress.
58 if (ObjectPayloadInProgress()) {
59 // This is additional payload for an OBJECT.
60 QUICHE_DCHECK(buffered_message_.empty());
61 if (!object_metadata_->payload_length.has_value()) {
62 // Deliver the data and exit.
63 visitor_.OnObjectMessage(*object_metadata_, data, fin);
64 if (fin) {
65 object_metadata_.reset();
66 }
67 return;
68 }
69 if (data.length() < payload_length_remaining_) {
70 // Does not finish the payload; deliver and exit.
71 visitor_.OnObjectMessage(*object_metadata_, data, false);
72 payload_length_remaining_ -= data.length();
73 return;
74 }
75 // Finishes the payload. Deliver and continue.
76 reader.emplace(data);
77 visitor_.OnObjectMessage(*object_metadata_,
78 data.substr(0, payload_length_remaining_), true);
79 reader->Seek(payload_length_remaining_);
80 payload_length_remaining_ = 0; // Expect a new object.
81 } else if (!buffered_message_.empty()) {
82 absl::StrAppend(&buffered_message_, data);
83 reader.emplace(buffered_message_);
84 } else {
85 // No message in progress.
86 reader.emplace(data);
87 }
88 size_t total_processed = 0;
89 while (!reader->IsDoneReading()) {
90 size_t message_len = ProcessMessage(reader->PeekRemainingPayload(), fin);
91 if (message_len == 0) {
92 if (reader->BytesRemaining() > kMaxMessageHeaderSize) {
93 ParseError(MoqtError::kInternalError,
94 "Cannot parse non-OBJECT messages > 2KB");
95 return;
96 }
97 if (fin) {
98 ParseError("FIN after incomplete message");
99 return;
100 }
101 if (buffered_message_.empty()) {
102 // If the buffer is not empty, |data| has already been copied there.
103 absl::StrAppend(&buffered_message_, reader->PeekRemainingPayload());
104 }
105 break;
106 }
107 // A message was successfully processed.
108 total_processed += message_len;
109 reader->Seek(message_len);
110 }
111 if (original_buffer_size > 0) {
112 buffered_message_.erase(0, total_processed);
113 }
114 }
115
116 // static
ProcessDatagram(absl::string_view data,MoqtObject & object_metadata)117 absl::string_view MoqtParser::ProcessDatagram(absl::string_view data,
118 MoqtObject& object_metadata) {
119 uint64_t value;
120 quic::QuicDataReader reader(data);
121 if (!reader.ReadVarInt62(&value)) {
122 return absl::string_view();
123 }
124 if (static_cast<MoqtMessageType>(value) != MoqtMessageType::kObjectDatagram) {
125 return absl::string_view();
126 }
127 size_t processed_data = ParseObjectHeader(reader, object_metadata,
128 MoqtMessageType::kObjectDatagram);
129 if (processed_data == 0) { // Incomplete header
130 return absl::string_view();
131 }
132 return reader.PeekRemainingPayload();
133 }
134
ProcessMessage(absl::string_view data,bool fin)135 size_t MoqtParser::ProcessMessage(absl::string_view data, bool fin) {
136 uint64_t value;
137 quic::QuicDataReader reader(data);
138 if (ObjectStreamInitialized() && !ObjectPayloadInProgress()) {
139 // This is a follow-on object in a stream.
140 return ProcessObject(reader,
141 GetMessageTypeForForwardingPreference(
142 object_metadata_->forwarding_preference),
143 fin);
144 }
145 if (!reader.ReadVarInt62(&value)) {
146 return 0;
147 }
148 auto type = static_cast<MoqtMessageType>(value);
149 switch (type) {
150 case MoqtMessageType::kObjectDatagram:
151 ParseError("Received OBJECT_DATAGRAM on stream");
152 return 0;
153 case MoqtMessageType::kObjectStream:
154 case MoqtMessageType::kStreamHeaderTrack:
155 case MoqtMessageType::kStreamHeaderGroup:
156 return ProcessObject(reader, type, fin);
157 case MoqtMessageType::kClientSetup:
158 return ProcessClientSetup(reader);
159 case MoqtMessageType::kServerSetup:
160 return ProcessServerSetup(reader);
161 case MoqtMessageType::kSubscribe:
162 return ProcessSubscribe(reader);
163 case MoqtMessageType::kSubscribeOk:
164 return ProcessSubscribeOk(reader);
165 case MoqtMessageType::kSubscribeError:
166 return ProcessSubscribeError(reader);
167 case MoqtMessageType::kUnsubscribe:
168 return ProcessUnsubscribe(reader);
169 case MoqtMessageType::kSubscribeDone:
170 return ProcessSubscribeDone(reader);
171 case MoqtMessageType::kAnnounce:
172 return ProcessAnnounce(reader);
173 case MoqtMessageType::kAnnounceOk:
174 return ProcessAnnounceOk(reader);
175 case MoqtMessageType::kAnnounceError:
176 return ProcessAnnounceError(reader);
177 case MoqtMessageType::kUnannounce:
178 return ProcessUnannounce(reader);
179 case MoqtMessageType::kGoAway:
180 return ProcessGoAway(reader);
181 default:
182 ParseError("Unknown message type");
183 return 0;
184 }
185 }
186
ProcessObject(quic::QuicDataReader & reader,MoqtMessageType type,bool fin)187 size_t MoqtParser::ProcessObject(quic::QuicDataReader& reader,
188 MoqtMessageType type, bool fin) {
189 size_t processed_data = 0;
190 QUICHE_DCHECK(!ObjectPayloadInProgress());
191 if (!ObjectStreamInitialized()) {
192 object_metadata_ = MoqtObject();
193 processed_data = ParseObjectHeader(reader, object_metadata_.value(), type);
194 if (processed_data == 0) {
195 object_metadata_.reset();
196 return 0;
197 }
198 }
199 // At this point, enough data has been processed to store in object_metadata_,
200 // even if there's nothing else in the buffer.
201 QUICHE_DCHECK(payload_length_remaining_ == 0);
202 switch (type) {
203 case MoqtMessageType::kStreamHeaderTrack:
204 if (!reader.ReadVarInt62(&object_metadata_->group_id)) {
205 return processed_data;
206 }
207 [[fallthrough]];
208 case MoqtMessageType::kStreamHeaderGroup: {
209 uint64_t length;
210 if (!reader.ReadVarInt62(&object_metadata_->object_id) ||
211 !reader.ReadVarInt62(&length)) {
212 return processed_data;
213 }
214 object_metadata_->payload_length = length;
215 break;
216 }
217 default:
218 break;
219 }
220 bool has_length = object_metadata_->payload_length.has_value();
221 bool received_complete_message = false;
222 size_t payload_to_draw = reader.BytesRemaining();
223 if (fin && has_length &&
224 *object_metadata_->payload_length > reader.BytesRemaining()) {
225 ParseError("Received FIN mid-payload");
226 return processed_data;
227 }
228 received_complete_message =
229 fin || (has_length &&
230 *object_metadata_->payload_length <= reader.BytesRemaining());
231 if (received_complete_message && has_length &&
232 *object_metadata_->payload_length < reader.BytesRemaining()) {
233 payload_to_draw = *object_metadata_->payload_length;
234 }
235 // The error case where there's a fin before the explicit length is complete
236 // is handled in ProcessData() in two separate places. Even though the
237 // message is "done" if fin regardless of has_length, it's bad to report to
238 // the application that the object is done if it hasn't reached the promised
239 // length.
240 visitor_.OnObjectMessage(
241 *object_metadata_,
242 reader.PeekRemainingPayload().substr(0, payload_to_draw),
243 received_complete_message);
244 reader.Seek(payload_to_draw);
245 payload_length_remaining_ =
246 has_length ? *object_metadata_->payload_length - payload_to_draw : 0;
247 return reader.PreviouslyReadPayload().length();
248 }
249
ProcessClientSetup(quic::QuicDataReader & reader)250 size_t MoqtParser::ProcessClientSetup(quic::QuicDataReader& reader) {
251 MoqtClientSetup setup;
252 uint64_t number_of_supported_versions;
253 if (!reader.ReadVarInt62(&number_of_supported_versions)) {
254 return 0;
255 }
256 uint64_t version;
257 for (uint64_t i = 0; i < number_of_supported_versions; ++i) {
258 if (!reader.ReadVarInt62(&version)) {
259 return 0;
260 }
261 setup.supported_versions.push_back(static_cast<MoqtVersion>(version));
262 }
263 uint64_t num_params;
264 if (!reader.ReadVarInt62(&num_params)) {
265 return 0;
266 }
267 // Parse parameters
268 for (uint64_t i = 0; i < num_params; ++i) {
269 uint64_t type;
270 absl::string_view value;
271 if (!ReadParameter(reader, type, value)) {
272 return 0;
273 }
274 auto key = static_cast<MoqtSetupParameter>(type);
275 switch (key) {
276 case MoqtSetupParameter::kRole:
277 if (setup.role.has_value()) {
278 ParseError("ROLE parameter appears twice in SETUP");
279 return 0;
280 }
281 uint64_t index;
282 if (!StringViewToVarInt(value, index)) {
283 return 0;
284 }
285 if (index > static_cast<uint64_t>(MoqtRole::kRoleMax)) {
286 ParseError("Invalid ROLE parameter");
287 return 0;
288 }
289 setup.role = static_cast<MoqtRole>(index);
290 break;
291 case MoqtSetupParameter::kPath:
292 if (uses_web_transport_) {
293 ParseError(
294 "WebTransport connection is using PATH parameter in SETUP");
295 return 0;
296 }
297 if (setup.path.has_value()) {
298 ParseError("PATH parameter appears twice in CLIENT_SETUP");
299 return 0;
300 }
301 setup.path = value;
302 break;
303 default:
304 // Skip over the parameter.
305 break;
306 }
307 }
308 if (!setup.role.has_value()) {
309 ParseError("ROLE parameter missing from CLIENT_SETUP message");
310 return 0;
311 }
312 if (!uses_web_transport_ && !setup.path.has_value()) {
313 ParseError("PATH SETUP parameter missing from Client message over QUIC");
314 return 0;
315 }
316 visitor_.OnClientSetupMessage(setup);
317 return reader.PreviouslyReadPayload().length();
318 }
319
ProcessServerSetup(quic::QuicDataReader & reader)320 size_t MoqtParser::ProcessServerSetup(quic::QuicDataReader& reader) {
321 MoqtServerSetup setup;
322 uint64_t version;
323 if (!reader.ReadVarInt62(&version)) {
324 return 0;
325 }
326 setup.selected_version = static_cast<MoqtVersion>(version);
327 uint64_t num_params;
328 if (!reader.ReadVarInt62(&num_params)) {
329 return 0;
330 }
331 // Parse parameters
332 for (uint64_t i = 0; i < num_params; ++i) {
333 uint64_t type;
334 absl::string_view value;
335 if (!ReadParameter(reader, type, value)) {
336 return 0;
337 }
338 auto key = static_cast<MoqtSetupParameter>(type);
339 switch (key) {
340 case MoqtSetupParameter::kRole:
341 if (setup.role.has_value()) {
342 ParseError("ROLE parameter appears twice in SETUP");
343 return 0;
344 }
345 uint64_t index;
346 if (!StringViewToVarInt(value, index)) {
347 return 0;
348 }
349 if (index > static_cast<uint64_t>(MoqtRole::kRoleMax)) {
350 ParseError("Invalid ROLE parameter");
351 return 0;
352 }
353 setup.role = static_cast<MoqtRole>(index);
354 break;
355 case MoqtSetupParameter::kPath:
356 ParseError("PATH parameter in SERVER_SETUP");
357 return 0;
358 default:
359 // Skip over the parameter.
360 break;
361 }
362 }
363 if (!setup.role.has_value()) {
364 ParseError("ROLE parameter missing from SERVER_SETUP message");
365 return 0;
366 }
367 visitor_.OnServerSetupMessage(setup);
368 return reader.PreviouslyReadPayload().length();
369 }
370
ProcessSubscribe(quic::QuicDataReader & reader)371 size_t MoqtParser::ProcessSubscribe(quic::QuicDataReader& reader) {
372 MoqtSubscribe subscribe_request;
373 if (!reader.ReadVarInt62(&subscribe_request.subscribe_id) ||
374 !reader.ReadVarInt62(&subscribe_request.track_alias) ||
375 !reader.ReadStringVarInt62(subscribe_request.track_namespace) ||
376 !reader.ReadStringVarInt62(subscribe_request.track_name) ||
377 !ReadLocation(reader, subscribe_request.start_group)) {
378 return 0;
379 }
380 if (!subscribe_request.start_group.has_value()) {
381 ParseError("START_GROUP must not be None in SUBSCRIBE");
382 return 0;
383 }
384 if (!ReadLocation(reader, subscribe_request.start_object)) {
385 return 0;
386 }
387 if (!subscribe_request.start_object.has_value()) {
388 ParseError("START_OBJECT must not be None in SUBSCRIBE");
389 return 0;
390 }
391 if (!ReadLocation(reader, subscribe_request.end_group) ||
392 !ReadLocation(reader, subscribe_request.end_object)) {
393 return 0;
394 }
395 if (subscribe_request.end_group.has_value() !=
396 subscribe_request.end_object.has_value()) {
397 ParseError(
398 "SUBSCRIBE end_group and end_object must be both None "
399 "or both non_None");
400 return 0;
401 }
402 uint64_t num_params;
403 if (!reader.ReadVarInt62(&num_params)) {
404 return 0;
405 }
406 for (uint64_t i = 0; i < num_params; ++i) {
407 uint64_t type;
408 absl::string_view value;
409 if (!ReadParameter(reader, type, value)) {
410 return 0;
411 }
412 auto key = static_cast<MoqtTrackRequestParameter>(type);
413 switch (key) {
414 case MoqtTrackRequestParameter::kAuthorizationInfo:
415 if (subscribe_request.authorization_info.has_value()) {
416 ParseError(
417 "AUTHORIZATION_INFO parameter appears twice in "
418 "SUBSCRIBE_REQUEST");
419 return 0;
420 }
421 subscribe_request.authorization_info = value;
422 break;
423 default:
424 // Skip over the parameter.
425 break;
426 }
427 }
428 visitor_.OnSubscribeMessage(subscribe_request);
429 return reader.PreviouslyReadPayload().length();
430 }
431
ProcessSubscribeOk(quic::QuicDataReader & reader)432 size_t MoqtParser::ProcessSubscribeOk(quic::QuicDataReader& reader) {
433 MoqtSubscribeOk subscribe_ok;
434 uint64_t milliseconds;
435 uint8_t content_exists;
436 if (!reader.ReadVarInt62(&subscribe_ok.subscribe_id) ||
437 !reader.ReadVarInt62(&milliseconds) ||
438 !reader.ReadUInt8(&content_exists)) {
439 return 0;
440 }
441 if (content_exists > 1) {
442 ParseError("SUBSCRIBE_OK ContentExists has invalid value");
443 return 0;
444 }
445 subscribe_ok.expires = quic::QuicTimeDelta::FromMilliseconds(milliseconds);
446 if (content_exists) {
447 subscribe_ok.largest_id = FullSequence();
448 if (!reader.ReadVarInt62(&subscribe_ok.largest_id->group) ||
449 !reader.ReadVarInt62(&subscribe_ok.largest_id->object)) {
450 return 0;
451 }
452 }
453 visitor_.OnSubscribeOkMessage(subscribe_ok);
454 return reader.PreviouslyReadPayload().length();
455 }
456
ProcessSubscribeError(quic::QuicDataReader & reader)457 size_t MoqtParser::ProcessSubscribeError(quic::QuicDataReader& reader) {
458 MoqtSubscribeError subscribe_error;
459 uint64_t error_code;
460 if (!reader.ReadVarInt62(&subscribe_error.subscribe_id) ||
461 !reader.ReadVarInt62(&error_code) ||
462 !reader.ReadStringVarInt62(subscribe_error.reason_phrase) ||
463 !reader.ReadVarInt62(&subscribe_error.track_alias)) {
464 return 0;
465 }
466 subscribe_error.error_code = static_cast<SubscribeErrorCode>(error_code);
467 visitor_.OnSubscribeErrorMessage(subscribe_error);
468 return reader.PreviouslyReadPayload().length();
469 }
470
ProcessUnsubscribe(quic::QuicDataReader & reader)471 size_t MoqtParser::ProcessUnsubscribe(quic::QuicDataReader& reader) {
472 MoqtUnsubscribe unsubscribe;
473 if (!reader.ReadVarInt62(&unsubscribe.subscribe_id)) {
474 return 0;
475 }
476 visitor_.OnUnsubscribeMessage(unsubscribe);
477 return reader.PreviouslyReadPayload().length();
478 }
479
ProcessSubscribeDone(quic::QuicDataReader & reader)480 size_t MoqtParser::ProcessSubscribeDone(quic::QuicDataReader& reader) {
481 MoqtSubscribeDone subscribe_done;
482 uint8_t content_exists;
483 if (!reader.ReadVarInt62(&subscribe_done.subscribe_id) ||
484 !reader.ReadVarInt62(&subscribe_done.status_code) ||
485 !reader.ReadStringVarInt62(subscribe_done.reason_phrase) ||
486 !reader.ReadUInt8(&content_exists)) {
487 return 0;
488 }
489 if (content_exists > 1) {
490 ParseError("SUBSCRIBE_DONE ContentExists has invalid value");
491 return 0;
492 }
493 if (content_exists == 1) {
494 subscribe_done.final_id = FullSequence();
495 if (!reader.ReadVarInt62(&subscribe_done.final_id->group) ||
496 !reader.ReadVarInt62(&subscribe_done.final_id->object)) {
497 return 0;
498 }
499 }
500 visitor_.OnSubscribeDoneMessage(subscribe_done);
501 return reader.PreviouslyReadPayload().length();
502 }
503
ProcessAnnounce(quic::QuicDataReader & reader)504 size_t MoqtParser::ProcessAnnounce(quic::QuicDataReader& reader) {
505 MoqtAnnounce announce;
506 if (!reader.ReadStringVarInt62(announce.track_namespace)) {
507 return 0;
508 }
509 uint64_t num_params;
510 if (!reader.ReadVarInt62(&num_params)) {
511 return 0;
512 }
513 for (uint64_t i = 0; i < num_params; ++i) {
514 uint64_t type;
515 absl::string_view value;
516 if (!ReadParameter(reader, type, value)) {
517 return 0;
518 }
519 auto key = static_cast<MoqtTrackRequestParameter>(type);
520 switch (key) {
521 case MoqtTrackRequestParameter::kAuthorizationInfo:
522 if (announce.authorization_info.has_value()) {
523 ParseError("AUTHORIZATION_INFO parameter appears twice in ANNOUNCE");
524 return 0;
525 }
526 announce.authorization_info = value;
527 break;
528 default:
529 // Skip over the parameter.
530 break;
531 }
532 }
533 visitor_.OnAnnounceMessage(announce);
534 return reader.PreviouslyReadPayload().length();
535 }
536
ProcessAnnounceOk(quic::QuicDataReader & reader)537 size_t MoqtParser::ProcessAnnounceOk(quic::QuicDataReader& reader) {
538 MoqtAnnounceOk announce_ok;
539 if (!reader.ReadStringVarInt62(announce_ok.track_namespace)) {
540 return 0;
541 }
542 visitor_.OnAnnounceOkMessage(announce_ok);
543 return reader.PreviouslyReadPayload().length();
544 }
545
ProcessAnnounceError(quic::QuicDataReader & reader)546 size_t MoqtParser::ProcessAnnounceError(quic::QuicDataReader& reader) {
547 MoqtAnnounceError announce_error;
548 if (!reader.ReadStringVarInt62(announce_error.track_namespace)) {
549 return 0;
550 }
551 uint64_t error_code;
552 if (!reader.ReadVarInt62(&error_code)) {
553 return 0;
554 }
555 announce_error.error_code = static_cast<MoqtAnnounceErrorCode>(error_code);
556 if (!reader.ReadStringVarInt62(announce_error.reason_phrase)) {
557 return 0;
558 }
559 visitor_.OnAnnounceErrorMessage(announce_error);
560 return reader.PreviouslyReadPayload().length();
561 }
562
ProcessUnannounce(quic::QuicDataReader & reader)563 size_t MoqtParser::ProcessUnannounce(quic::QuicDataReader& reader) {
564 MoqtUnannounce unannounce;
565 if (!reader.ReadStringVarInt62(unannounce.track_namespace)) {
566 return 0;
567 }
568 visitor_.OnUnannounceMessage(unannounce);
569 return reader.PreviouslyReadPayload().length();
570 }
571
ProcessGoAway(quic::QuicDataReader & reader)572 size_t MoqtParser::ProcessGoAway(quic::QuicDataReader& reader) {
573 MoqtGoAway goaway;
574 if (!reader.ReadStringVarInt62(goaway.new_session_uri)) {
575 return 0;
576 }
577 visitor_.OnGoAwayMessage(goaway);
578 return reader.PreviouslyReadPayload().length();
579 }
580
581 // static
ParseObjectHeader(quic::QuicDataReader & reader,MoqtObject & object,MoqtMessageType type)582 size_t MoqtParser::ParseObjectHeader(quic::QuicDataReader& reader,
583 MoqtObject& object, MoqtMessageType type) {
584 if (!reader.ReadVarInt62(&object.subscribe_id) ||
585 !reader.ReadVarInt62(&object.track_alias)) {
586 return 0;
587 }
588 if (type != MoqtMessageType::kStreamHeaderTrack &&
589 !reader.ReadVarInt62(&object.group_id)) {
590 return 0;
591 }
592 if (type != MoqtMessageType::kStreamHeaderTrack &&
593 type != MoqtMessageType::kStreamHeaderGroup &&
594 !reader.ReadVarInt62(&object.object_id)) {
595 return 0;
596 }
597 if (!reader.ReadVarInt62(&object.object_send_order)) {
598 return 0;
599 }
600 object.forwarding_preference = GetForwardingPreference(type);
601 return reader.PreviouslyReadPayload().length();
602 }
603
ParseError(absl::string_view reason)604 void MoqtParser::ParseError(absl::string_view reason) {
605 ParseError(MoqtError::kProtocolViolation, reason);
606 }
607
ParseError(MoqtError error_code,absl::string_view reason)608 void MoqtParser::ParseError(MoqtError error_code, absl::string_view reason) {
609 if (parsing_error_) {
610 return; // Don't send multiple parse errors.
611 }
612 no_more_data_ = true;
613 parsing_error_ = true;
614 visitor_.OnParsingError(error_code, reason);
615 }
616
ReadVarIntPieceVarInt62(quic::QuicDataReader & reader,uint64_t & result)617 bool MoqtParser::ReadVarIntPieceVarInt62(quic::QuicDataReader& reader,
618 uint64_t& result) {
619 uint64_t length;
620 if (!reader.ReadVarInt62(&length)) {
621 return false;
622 }
623 uint64_t actual_length = static_cast<uint64_t>(reader.PeekVarInt62Length());
624 if (length != actual_length) {
625 ParseError("Parameter VarInt has length field mismatch");
626 return false;
627 }
628 if (!reader.ReadVarInt62(&result)) {
629 return false;
630 }
631 return true;
632 }
633
ReadLocation(quic::QuicDataReader & reader,std::optional<MoqtSubscribeLocation> & loc)634 bool MoqtParser::ReadLocation(quic::QuicDataReader& reader,
635 std::optional<MoqtSubscribeLocation>& loc) {
636 uint64_t ui64;
637 if (!reader.ReadVarInt62(&ui64)) {
638 return false;
639 }
640 auto mode = static_cast<MoqtSubscribeLocationMode>(ui64);
641 if (mode == MoqtSubscribeLocationMode::kNone) {
642 loc = std::nullopt;
643 return true;
644 }
645 if (!reader.ReadVarInt62(&ui64)) {
646 return false;
647 }
648 switch (mode) {
649 case MoqtSubscribeLocationMode::kAbsolute:
650 loc = MoqtSubscribeLocation(true, ui64);
651 break;
652 case MoqtSubscribeLocationMode::kRelativePrevious:
653 loc = MoqtSubscribeLocation(false, -1 * static_cast<int64_t>(ui64));
654 break;
655 case MoqtSubscribeLocationMode::kRelativeNext:
656 loc = MoqtSubscribeLocation(false, static_cast<int64_t>(ui64) + 1);
657 break;
658 default:
659 ParseError("Unknown location mode");
660 return false;
661 }
662 return true;
663 }
664
ReadParameter(quic::QuicDataReader & reader,uint64_t & type,absl::string_view & value)665 bool MoqtParser::ReadParameter(quic::QuicDataReader& reader, uint64_t& type,
666 absl::string_view& value) {
667 if (!reader.ReadVarInt62(&type)) {
668 return false;
669 }
670 return reader.ReadStringPieceVarInt62(&value);
671 }
672
StringViewToVarInt(absl::string_view & sv,uint64_t & vi)673 bool MoqtParser::StringViewToVarInt(absl::string_view& sv, uint64_t& vi) {
674 quic::QuicDataReader reader(sv);
675 if (static_cast<size_t>(reader.PeekVarInt62Length()) != sv.length()) {
676 ParseError(MoqtError::kParameterLengthMismatch,
677 "Parameter length does not match varint encoding");
678 return false;
679 }
680 reader.ReadVarInt62(&vi);
681 return true;
682 }
683
684 } // namespace moqt
685