xref: /aosp_15_r20/external/openscreen/cast/streaming/receiver.cc (revision 3f982cf4871df8771c9d4abe6e9a6f8d829b2736)
1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "cast/streaming/receiver.h"
6 
7 #include <algorithm>
8 #include <utility>
9 
10 #include "absl/types/span.h"
11 #include "cast/streaming/constants.h"
12 #include "cast/streaming/receiver_packet_router.h"
13 #include "cast/streaming/session_config.h"
14 #include "util/chrono_helpers.h"
15 #include "util/osp_logging.h"
16 #include "util/std_util.h"
17 #include "util/trace_logging.h"
18 
19 namespace openscreen {
20 namespace cast {
21 
22 // Conveniences for ensuring logging output includes the SSRC of the Receiver,
23 // to help distinguish one out of multiple instances in a Cast Streaming
24 // session.
25 //
26 #define RECEIVER_LOG(level) OSP_LOG_##level << "[SSRC:" << ssrc() << "] "
27 
Receiver(Environment * environment,ReceiverPacketRouter * packet_router,SessionConfig config)28 Receiver::Receiver(Environment* environment,
29                    ReceiverPacketRouter* packet_router,
30                    SessionConfig config)
31     : now_(environment->now_function()),
32       packet_router_(packet_router),
33       config_(config),
34       rtcp_session_(config.sender_ssrc, config.receiver_ssrc, now_()),
35       rtcp_parser_(&rtcp_session_),
36       rtcp_builder_(&rtcp_session_),
37       stats_tracker_(config.rtp_timebase),
38       rtp_parser_(config.sender_ssrc),
39       rtp_timebase_(config.rtp_timebase),
40       crypto_(config.aes_secret_key, config.aes_iv_mask),
41       is_pli_enabled_(config.is_pli_enabled),
42       rtcp_buffer_capacity_(environment->GetMaxPacketSize()),
43       rtcp_buffer_(new uint8_t[rtcp_buffer_capacity_]),
44       rtcp_alarm_(environment->now_function(), environment->task_runner()),
45       smoothed_clock_offset_(ClockDriftSmoother::kDefaultTimeConstant),
46       consumption_alarm_(environment->now_function(),
47                          environment->task_runner()) {
48   OSP_DCHECK(packet_router_);
49   OSP_DCHECK_EQ(checkpoint_frame(), FrameId::leader());
50   OSP_CHECK_GT(rtcp_buffer_capacity_, 0);
51   OSP_CHECK(rtcp_buffer_);
52 
53   rtcp_builder_.SetPlayoutDelay(config.target_playout_delay);
54   playout_delay_changes_.emplace_back(FrameId::leader(),
55                                       config.target_playout_delay);
56 
57   packet_router_->OnReceiverCreated(rtcp_session_.sender_ssrc(), this);
58 }
59 
~Receiver()60 Receiver::~Receiver() {
61   packet_router_->OnReceiverDestroyed(rtcp_session_.sender_ssrc());
62 }
63 
config() const64 const SessionConfig& Receiver::config() const {
65   return config_;
66 }
rtp_timebase() const67 int Receiver::rtp_timebase() const {
68   return rtp_timebase_;
69 }
ssrc() const70 Ssrc Receiver::ssrc() const {
71   return rtcp_session_.receiver_ssrc();
72 }
73 
SetConsumer(Consumer * consumer)74 void Receiver::SetConsumer(Consumer* consumer) {
75   consumer_ = consumer;
76   ScheduleFrameReadyCheck();
77 }
78 
SetPlayerProcessingTime(Clock::duration needed_time)79 void Receiver::SetPlayerProcessingTime(Clock::duration needed_time) {
80   player_processing_time_ = std::max(Clock::duration::zero(), needed_time);
81 }
82 
RequestKeyFrame()83 void Receiver::RequestKeyFrame() {
84   // If we don't have picture loss indication enabled, we should not request
85   // any key frames.
86   OSP_DCHECK(is_pli_enabled_) << "PLI is not enabled.";
87   if (is_pli_enabled_ && !last_key_frame_received_.is_null() &&
88       last_frame_consumed_ >= last_key_frame_received_ &&
89       !rtcp_builder_.is_picture_loss_indicator_set()) {
90     rtcp_builder_.SetPictureLossIndicator(true);
91     SendRtcp();
92   }
93 }
94 
AdvanceToNextFrame()95 int Receiver::AdvanceToNextFrame() {
96   TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
97   const FrameId immediate_next_frame = last_frame_consumed_ + 1;
98 
99   // Scan the queue for the next frame that should be consumed. Typically, this
100   // is the very next frame; but if it is incomplete and already late for
101   // playout, consider skipping-ahead.
102   for (FrameId f = immediate_next_frame; f <= latest_frame_expected_; ++f) {
103     PendingFrame& entry = GetQueueEntry(f);
104     if (entry.collector.is_complete()) {
105       const EncryptedFrame& encrypted_frame =
106           entry.collector.PeekAtAssembledFrame();
107       if (f == immediate_next_frame) {  // Typical case.
108         return FrameCrypto::GetPlaintextSize(encrypted_frame);
109       }
110       if (encrypted_frame.dependency != EncodedFrame::DEPENDS_ON_ANOTHER) {
111         // Found a frame after skipping past some frames. Drop the ones being
112         // skipped, advancing |last_frame_consumed_| before returning.
113         DropAllFramesBefore(f);
114         return FrameCrypto::GetPlaintextSize(encrypted_frame);
115       }
116       // Conclusion: The frame in the current queue entry is complete, but
117       // depends on a prior incomplete frame. Continue scanning...
118     }
119 
120     // Do not consider skipping past this frame if its estimated capture time is
121     // unknown. The implication here is that, if |estimated_capture_time| is
122     // set, the Receiver also knows whether any target playout delay changes
123     // were communicated from the Sender in the frame's first RTP packet.
124     if (!entry.estimated_capture_time) {
125       break;
126     }
127 
128     // If this incomplete frame is not yet late for playout, simply wait for the
129     // rest of its packets to come in. However, do schedule a check to
130     // re-examine things at the time it would become a late frame, to possibly
131     // skip-over it.
132     const auto playout_time =
133         *entry.estimated_capture_time + ResolveTargetPlayoutDelay(f);
134     if (playout_time > (now_() + player_processing_time_)) {
135       ScheduleFrameReadyCheck(playout_time);
136       break;
137     }
138   }
139 
140   return kNoFramesReady;
141 }
142 
ConsumeNextFrame(absl::Span<uint8_t> buffer)143 EncodedFrame Receiver::ConsumeNextFrame(absl::Span<uint8_t> buffer) {
144   TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
145   // Assumption: The required call to AdvanceToNextFrame() ensures that
146   // |last_frame_consumed_| is set to one before the frame to be consumed here.
147   const FrameId frame_id = last_frame_consumed_ + 1;
148   OSP_CHECK_LE(frame_id, checkpoint_frame());
149 
150   // Decrypt the frame, populating the given output |frame|.
151   PendingFrame& entry = GetQueueEntry(frame_id);
152   OSP_DCHECK(entry.collector.is_complete());
153   EncodedFrame frame;
154   frame.data = buffer;
155   crypto_.Decrypt(entry.collector.PeekAtAssembledFrame(), &frame);
156   OSP_DCHECK(entry.estimated_capture_time);
157   frame.reference_time =
158       *entry.estimated_capture_time + ResolveTargetPlayoutDelay(frame_id);
159 
160   OSP_VLOG << "ConsumeNextFrame → " << frame.frame_id << ": "
161            << frame.data.size() << " payload bytes, RTP Timestamp "
162            << frame.rtp_timestamp.ToTimeSinceOrigin<microseconds>(rtp_timebase_)
163                   .count()
164            << " µs, to play-out "
165            << to_microseconds(frame.reference_time - now_()).count()
166            << " µs from now.";
167 
168   entry.Reset();
169   last_frame_consumed_ = frame_id;
170 
171   // Ensure the Consumer is notified if there are already more frames ready for
172   // consumption, and it hasn't explicitly called AdvanceToNextFrame() to check
173   // for itself.
174   ScheduleFrameReadyCheck();
175 
176   return frame;
177 }
178 
OnReceivedRtpPacket(Clock::time_point arrival_time,std::vector<uint8_t> packet)179 void Receiver::OnReceivedRtpPacket(Clock::time_point arrival_time,
180                                    std::vector<uint8_t> packet) {
181   const absl::optional<RtpPacketParser::ParseResult> part =
182       rtp_parser_.Parse(packet);
183   if (!part) {
184     RECEIVER_LOG(WARN) << "Parsing of " << packet.size()
185                        << " bytes as an RTP packet failed.";
186     return;
187   }
188   stats_tracker_.OnReceivedValidRtpPacket(part->sequence_number,
189                                           part->rtp_timestamp, arrival_time);
190 
191   // Ignore packets for frames the Receiver is no longer interested in.
192   if (part->frame_id <= checkpoint_frame()) {
193     return;
194   }
195 
196   // Extend the range of frames known to this Receiver, within the capacity of
197   // this Receiver's queue. Prepare the FrameCollectors to receive any
198   // newly-discovered frames.
199   if (part->frame_id > latest_frame_expected_) {
200     const FrameId max_allowed_frame_id =
201         last_frame_consumed_ + kMaxUnackedFrames;
202     if (part->frame_id > max_allowed_frame_id) {
203       return;
204     }
205     do {
206       ++latest_frame_expected_;
207       GetQueueEntry(latest_frame_expected_)
208           .collector.set_frame_id(latest_frame_expected_);
209     } while (latest_frame_expected_ < part->frame_id);
210   }
211 
212   // Start-up edge case: Blatantly drop the first packet of all frames until the
213   // Receiver has processed at least one Sender Report containing the necessary
214   // clock-drift and lip-sync information (see OnReceivedRtcpPacket()). This is
215   // an inescapable data dependency. Note that this special case should almost
216   // never trigger, since a well-behaving Sender will send the first Sender
217   // Report RTCP packet before any of the RTP packets.
218   if (!last_sender_report_ && part->packet_id == FramePacketId{0}) {
219     RECEIVER_LOG(WARN) << "Dropping packet 0 of frame " << part->frame_id
220                        << " because it arrived before the first Sender Report.";
221     // Note: The Sender will have to re-transmit this dropped packet after the
222     // Sender Report to allow the Receiver to move forward.
223     return;
224   }
225 
226   PendingFrame& pending_frame = GetQueueEntry(part->frame_id);
227   FrameCollector& collector = pending_frame.collector;
228   if (collector.is_complete()) {
229     // An extra, redundant |packet| was received. Do nothing since the frame was
230     // already complete.
231     return;
232   }
233 
234   if (!collector.CollectRtpPacket(*part, &packet)) {
235     return;  // Bad data in the parsed packet. Ignore it.
236   }
237 
238   // The first packet in a frame contains timing information critical for
239   // computing this frame's (and all future frames') playout time. Process that,
240   // but only once.
241   if (part->packet_id == FramePacketId{0} &&
242       !pending_frame.estimated_capture_time) {
243     // Estimate the original capture time of this frame (at the Sender), in
244     // terms of the Receiver's clock: First, start with a reference time point
245     // from the Sender's clock (the one from the last Sender Report). Then,
246     // translate it into the equivalent reference time point in terms of the
247     // Receiver's clock by applying the measured offset between the two clocks.
248     // Finally, apply the RTP timestamp difference between the Sender Report and
249     // this frame to determine what the original capture time of this frame was.
250     pending_frame.estimated_capture_time =
251         last_sender_report_->reference_time + smoothed_clock_offset_.Current() +
252         (part->rtp_timestamp - last_sender_report_->rtp_timestamp)
253             .ToDuration<Clock::duration>(rtp_timebase_);
254 
255     // If a target playout delay change was included in this packet, record it.
256     if (part->new_playout_delay > milliseconds::zero()) {
257       RecordNewTargetPlayoutDelay(part->frame_id, part->new_playout_delay);
258     }
259 
260     // Now that the estimated capture time is known, other frames may have just
261     // become ready, per the frame-skipping logic in AdvanceToNextFrame().
262     ScheduleFrameReadyCheck();
263   }
264 
265   if (!collector.is_complete()) {
266     return;  // Wait for the rest of the packets to come in.
267   }
268   const EncryptedFrame& encrypted_frame = collector.PeekAtAssembledFrame();
269 
270   // Whenever a key frame has been received, the decoder has what it needs to
271   // recover. In this case, clear the PLI condition.
272   if (encrypted_frame.dependency == EncryptedFrame::KEY_FRAME) {
273     rtcp_builder_.SetPictureLossIndicator(false);
274     last_key_frame_received_ = part->frame_id;
275   }
276 
277   // If this just-completed frame is the one right after the checkpoint frame,
278   // advance the checkpoint forward.
279   if (part->frame_id == (checkpoint_frame() + 1)) {
280     AdvanceCheckpoint(part->frame_id);
281   }
282 
283   // Since a frame has become complete, schedule a check to see whether this or
284   // any other frames have become ready for consumption.
285   ScheduleFrameReadyCheck();
286 }
287 
OnReceivedRtcpPacket(Clock::time_point arrival_time,std::vector<uint8_t> packet)288 void Receiver::OnReceivedRtcpPacket(Clock::time_point arrival_time,
289                                     std::vector<uint8_t> packet) {
290   TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
291   absl::optional<SenderReportParser::SenderReportWithId> parsed_report =
292       rtcp_parser_.Parse(packet);
293   if (!parsed_report) {
294     RECEIVER_LOG(WARN) << "Parsing of " << packet.size()
295                        << " bytes as an RTCP packet failed.";
296     return;
297   }
298   last_sender_report_ = std::move(parsed_report);
299   last_sender_report_arrival_time_ = arrival_time;
300 
301   // Measure the offset between the Sender's clock and the Receiver's Clock.
302   // This will be used to translate reference timestamps from the Sender into
303   // timestamps that represent the exact same moment in time at the Receiver.
304   //
305   // Note: Due to design limitations in the Cast Streaming spec, the Receiver
306   // has no way to compute how long it took the Sender Report to travel over the
307   // network. The calculation here just ignores that, and so the
308   // |measured_offset| below will be larger than the true value by that amount.
309   // This will have the effect of a later-than-configured playout delay.
310   const Clock::duration measured_offset =
311       arrival_time - last_sender_report_->reference_time;
312   smoothed_clock_offset_.Update(arrival_time, measured_offset);
313 
314   RtcpReportBlock report;
315   report.ssrc = rtcp_session_.sender_ssrc();
316   stats_tracker_.PopulateNextReport(&report);
317   report.last_status_report_id = last_sender_report_->report_id;
318   report.SetDelaySinceLastReport(now_() - last_sender_report_arrival_time_);
319   rtcp_builder_.IncludeReceiverReportInNextPacket(report);
320 
321   SendRtcp();
322 }
323 
SendRtcp()324 void Receiver::SendRtcp() {
325   // Collect ACK/NACK feedback for all active frames in the queue.
326   std::vector<PacketNack> packet_nacks;
327   std::vector<FrameId> frame_acks;
328   for (FrameId f = checkpoint_frame() + 1; f <= latest_frame_expected_; ++f) {
329     const FrameCollector& collector = GetQueueEntry(f).collector;
330     if (collector.is_complete()) {
331       frame_acks.push_back(f);
332     } else {
333       collector.GetMissingPackets(&packet_nacks);
334     }
335   }
336 
337   // Build and send a compound RTCP packet.
338   const bool no_nacks = packet_nacks.empty();
339   rtcp_builder_.IncludeFeedbackInNextPacket(std::move(packet_nacks),
340                                             std::move(frame_acks));
341   last_rtcp_send_time_ = now_();
342   packet_router_->SendRtcpPacket(rtcp_builder_.BuildPacket(
343       last_rtcp_send_time_,
344       absl::Span<uint8_t>(rtcp_buffer_.get(), rtcp_buffer_capacity_)));
345 
346   // Schedule the automatic sending of another RTCP packet, if this method is
347   // not called within some bounded amount of time. While incomplete frames
348   // exist in the queue, send RTCP packets (with ACK/NACK feedback) frequently.
349   // When there are no incomplete frames, use a longer "keepalive" interval.
350   const Clock::duration interval =
351       (no_nacks ? kRtcpReportInterval : kNackFeedbackInterval);
352   rtcp_alarm_.Schedule([this] { SendRtcp(); }, last_rtcp_send_time_ + interval);
353 }
354 
GetQueueEntry(FrameId frame_id) const355 const Receiver::PendingFrame& Receiver::GetQueueEntry(FrameId frame_id) const {
356   return const_cast<Receiver*>(this)->GetQueueEntry(frame_id);
357 }
358 
GetQueueEntry(FrameId frame_id)359 Receiver::PendingFrame& Receiver::GetQueueEntry(FrameId frame_id) {
360   return pending_frames_[(frame_id - FrameId::first()) %
361                          pending_frames_.size()];
362 }
363 
RecordNewTargetPlayoutDelay(FrameId as_of_frame,milliseconds delay)364 void Receiver::RecordNewTargetPlayoutDelay(FrameId as_of_frame,
365                                            milliseconds delay) {
366   OSP_DCHECK_GT(as_of_frame, checkpoint_frame());
367 
368   // Prune-out entries from |playout_delay_changes_| that are no longer needed.
369   // At least one entry must always be kept (i.e., there must always be a
370   // "current" setting).
371   const FrameId next_frame = last_frame_consumed_ + 1;
372   const auto keep_one_before_it = std::find_if(
373       std::next(playout_delay_changes_.begin()), playout_delay_changes_.end(),
374       [&](const auto& entry) { return entry.first > next_frame; });
375   playout_delay_changes_.erase(playout_delay_changes_.begin(),
376                                std::prev(keep_one_before_it));
377 
378   // Insert the delay change entry, maintaining the ascending ordering of the
379   // vector.
380   const auto insert_it = std::find_if(
381       playout_delay_changes_.begin(), playout_delay_changes_.end(),
382       [&](const auto& entry) { return entry.first > as_of_frame; });
383   playout_delay_changes_.emplace(insert_it, as_of_frame, delay);
384 
385   OSP_DCHECK(AreElementsSortedAndUnique(playout_delay_changes_));
386 }
387 
ResolveTargetPlayoutDelay(FrameId frame_id) const388 milliseconds Receiver::ResolveTargetPlayoutDelay(FrameId frame_id) const {
389   OSP_DCHECK_GT(frame_id, last_frame_consumed_);
390 
391 #if OSP_DCHECK_IS_ON()
392   // Extra precaution: Ensure all possible playout delay changes are known. In
393   // other words, every unconsumed frame in the queue, up to (and including)
394   // |frame_id|, must have an assigned estimated_capture_time.
395   for (FrameId f = last_frame_consumed_ + 1; f <= frame_id; ++f) {
396     OSP_DCHECK(GetQueueEntry(f).estimated_capture_time)
397         << " don't know whether there was a playout delay change for frame "
398         << f;
399   }
400 #endif
401 
402   const auto it = std::find_if(
403       playout_delay_changes_.crbegin(), playout_delay_changes_.crend(),
404       [&](const auto& entry) { return entry.first <= frame_id; });
405   OSP_DCHECK(it != playout_delay_changes_.crend());
406   return it->second;
407 }
408 
AdvanceCheckpoint(FrameId new_checkpoint)409 void Receiver::AdvanceCheckpoint(FrameId new_checkpoint) {
410   TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
411   OSP_DCHECK_GT(new_checkpoint, checkpoint_frame());
412   OSP_DCHECK_LE(new_checkpoint, latest_frame_expected_);
413 
414   while (new_checkpoint < latest_frame_expected_) {
415     const FrameId next = new_checkpoint + 1;
416     if (!GetQueueEntry(next).collector.is_complete()) {
417       break;
418     }
419     new_checkpoint = next;
420   }
421 
422   set_checkpoint_frame(new_checkpoint);
423   rtcp_builder_.SetPlayoutDelay(ResolveTargetPlayoutDelay(new_checkpoint));
424   SendRtcp();
425 }
426 
DropAllFramesBefore(FrameId first_kept_frame)427 void Receiver::DropAllFramesBefore(FrameId first_kept_frame) {
428   // The following DCHECKs are verifying that this method is only being called
429   // because one or more incomplete frames are being skipped-over.
430   const FrameId first_to_drop = last_frame_consumed_ + 1;
431   OSP_DCHECK_GT(first_kept_frame, first_to_drop);
432   OSP_DCHECK_GT(first_kept_frame, checkpoint_frame());
433   OSP_DCHECK_LE(first_kept_frame, latest_frame_expected_);
434 
435   // Reset each of the frames being dropped, pretending that they were consumed.
436   for (FrameId f = first_to_drop; f < first_kept_frame; ++f) {
437     PendingFrame& entry = GetQueueEntry(f);
438     // Pedantic sanity-check: Ensure the "target playout delay change" data
439     // dependency was satisfied. See comments in AdvanceToNextFrame().
440     OSP_DCHECK(entry.estimated_capture_time);
441     entry.Reset();
442   }
443   last_frame_consumed_ = first_kept_frame - 1;
444 
445   RECEIVER_LOG(INFO) << "Artificially advancing checkpoint after skipping.";
446   AdvanceCheckpoint(first_kept_frame);
447 }
448 
ScheduleFrameReadyCheck(Clock::time_point when)449 void Receiver::ScheduleFrameReadyCheck(Clock::time_point when) {
450   consumption_alarm_.Schedule(
451       [this] {
452         if (consumer_) {
453           const int next_frame_buffer_size = AdvanceToNextFrame();
454           if (next_frame_buffer_size != kNoFramesReady) {
455             consumer_->OnFramesReady(next_frame_buffer_size);
456           }
457         }
458       },
459       when);
460 }
461 
462 Receiver::PendingFrame::PendingFrame() = default;
463 Receiver::PendingFrame::~PendingFrame() = default;
464 
Reset()465 void Receiver::PendingFrame::Reset() {
466   collector.Reset();
467   estimated_capture_time = absl::nullopt;
468 }
469 
470 // static
471 constexpr milliseconds Receiver::kDefaultPlayerProcessingTime;
472 constexpr int Receiver::kNoFramesReady;
473 constexpr milliseconds Receiver::kNackFeedbackInterval;
474 
475 }  // namespace cast
476 }  // namespace openscreen
477