xref: /aosp_15_r20/external/openscreen/cast/streaming/receiver.cc (revision 3f982cf4871df8771c9d4abe6e9a6f8d829b2736)
1*3f982cf4SFabien Sanglard // Copyright 2019 The Chromium Authors. All rights reserved.
2*3f982cf4SFabien Sanglard // Use of this source code is governed by a BSD-style license that can be
3*3f982cf4SFabien Sanglard // found in the LICENSE file.
4*3f982cf4SFabien Sanglard 
5*3f982cf4SFabien Sanglard #include "cast/streaming/receiver.h"
6*3f982cf4SFabien Sanglard 
7*3f982cf4SFabien Sanglard #include <algorithm>
8*3f982cf4SFabien Sanglard #include <utility>
9*3f982cf4SFabien Sanglard 
10*3f982cf4SFabien Sanglard #include "absl/types/span.h"
11*3f982cf4SFabien Sanglard #include "cast/streaming/constants.h"
12*3f982cf4SFabien Sanglard #include "cast/streaming/receiver_packet_router.h"
13*3f982cf4SFabien Sanglard #include "cast/streaming/session_config.h"
14*3f982cf4SFabien Sanglard #include "util/chrono_helpers.h"
15*3f982cf4SFabien Sanglard #include "util/osp_logging.h"
16*3f982cf4SFabien Sanglard #include "util/std_util.h"
17*3f982cf4SFabien Sanglard #include "util/trace_logging.h"
18*3f982cf4SFabien Sanglard 
19*3f982cf4SFabien Sanglard namespace openscreen {
20*3f982cf4SFabien Sanglard namespace cast {
21*3f982cf4SFabien Sanglard 
22*3f982cf4SFabien Sanglard // Conveniences for ensuring logging output includes the SSRC of the Receiver,
23*3f982cf4SFabien Sanglard // to help distinguish one out of multiple instances in a Cast Streaming
24*3f982cf4SFabien Sanglard // session.
25*3f982cf4SFabien Sanglard //
26*3f982cf4SFabien Sanglard #define RECEIVER_LOG(level) OSP_LOG_##level << "[SSRC:" << ssrc() << "] "
27*3f982cf4SFabien Sanglard 
Receiver(Environment * environment,ReceiverPacketRouter * packet_router,SessionConfig config)28*3f982cf4SFabien Sanglard Receiver::Receiver(Environment* environment,
29*3f982cf4SFabien Sanglard                    ReceiverPacketRouter* packet_router,
30*3f982cf4SFabien Sanglard                    SessionConfig config)
31*3f982cf4SFabien Sanglard     : now_(environment->now_function()),
32*3f982cf4SFabien Sanglard       packet_router_(packet_router),
33*3f982cf4SFabien Sanglard       config_(config),
34*3f982cf4SFabien Sanglard       rtcp_session_(config.sender_ssrc, config.receiver_ssrc, now_()),
35*3f982cf4SFabien Sanglard       rtcp_parser_(&rtcp_session_),
36*3f982cf4SFabien Sanglard       rtcp_builder_(&rtcp_session_),
37*3f982cf4SFabien Sanglard       stats_tracker_(config.rtp_timebase),
38*3f982cf4SFabien Sanglard       rtp_parser_(config.sender_ssrc),
39*3f982cf4SFabien Sanglard       rtp_timebase_(config.rtp_timebase),
40*3f982cf4SFabien Sanglard       crypto_(config.aes_secret_key, config.aes_iv_mask),
41*3f982cf4SFabien Sanglard       is_pli_enabled_(config.is_pli_enabled),
42*3f982cf4SFabien Sanglard       rtcp_buffer_capacity_(environment->GetMaxPacketSize()),
43*3f982cf4SFabien Sanglard       rtcp_buffer_(new uint8_t[rtcp_buffer_capacity_]),
44*3f982cf4SFabien Sanglard       rtcp_alarm_(environment->now_function(), environment->task_runner()),
45*3f982cf4SFabien Sanglard       smoothed_clock_offset_(ClockDriftSmoother::kDefaultTimeConstant),
46*3f982cf4SFabien Sanglard       consumption_alarm_(environment->now_function(),
47*3f982cf4SFabien Sanglard                          environment->task_runner()) {
48*3f982cf4SFabien Sanglard   OSP_DCHECK(packet_router_);
49*3f982cf4SFabien Sanglard   OSP_DCHECK_EQ(checkpoint_frame(), FrameId::leader());
50*3f982cf4SFabien Sanglard   OSP_CHECK_GT(rtcp_buffer_capacity_, 0);
51*3f982cf4SFabien Sanglard   OSP_CHECK(rtcp_buffer_);
52*3f982cf4SFabien Sanglard 
53*3f982cf4SFabien Sanglard   rtcp_builder_.SetPlayoutDelay(config.target_playout_delay);
54*3f982cf4SFabien Sanglard   playout_delay_changes_.emplace_back(FrameId::leader(),
55*3f982cf4SFabien Sanglard                                       config.target_playout_delay);
56*3f982cf4SFabien Sanglard 
57*3f982cf4SFabien Sanglard   packet_router_->OnReceiverCreated(rtcp_session_.sender_ssrc(), this);
58*3f982cf4SFabien Sanglard }
59*3f982cf4SFabien Sanglard 
~Receiver()60*3f982cf4SFabien Sanglard Receiver::~Receiver() {
61*3f982cf4SFabien Sanglard   packet_router_->OnReceiverDestroyed(rtcp_session_.sender_ssrc());
62*3f982cf4SFabien Sanglard }
63*3f982cf4SFabien Sanglard 
config() const64*3f982cf4SFabien Sanglard const SessionConfig& Receiver::config() const {
65*3f982cf4SFabien Sanglard   return config_;
66*3f982cf4SFabien Sanglard }
rtp_timebase() const67*3f982cf4SFabien Sanglard int Receiver::rtp_timebase() const {
68*3f982cf4SFabien Sanglard   return rtp_timebase_;
69*3f982cf4SFabien Sanglard }
ssrc() const70*3f982cf4SFabien Sanglard Ssrc Receiver::ssrc() const {
71*3f982cf4SFabien Sanglard   return rtcp_session_.receiver_ssrc();
72*3f982cf4SFabien Sanglard }
73*3f982cf4SFabien Sanglard 
SetConsumer(Consumer * consumer)74*3f982cf4SFabien Sanglard void Receiver::SetConsumer(Consumer* consumer) {
75*3f982cf4SFabien Sanglard   consumer_ = consumer;
76*3f982cf4SFabien Sanglard   ScheduleFrameReadyCheck();
77*3f982cf4SFabien Sanglard }
78*3f982cf4SFabien Sanglard 
SetPlayerProcessingTime(Clock::duration needed_time)79*3f982cf4SFabien Sanglard void Receiver::SetPlayerProcessingTime(Clock::duration needed_time) {
80*3f982cf4SFabien Sanglard   player_processing_time_ = std::max(Clock::duration::zero(), needed_time);
81*3f982cf4SFabien Sanglard }
82*3f982cf4SFabien Sanglard 
RequestKeyFrame()83*3f982cf4SFabien Sanglard void Receiver::RequestKeyFrame() {
84*3f982cf4SFabien Sanglard   // If we don't have picture loss indication enabled, we should not request
85*3f982cf4SFabien Sanglard   // any key frames.
86*3f982cf4SFabien Sanglard   OSP_DCHECK(is_pli_enabled_) << "PLI is not enabled.";
87*3f982cf4SFabien Sanglard   if (is_pli_enabled_ && !last_key_frame_received_.is_null() &&
88*3f982cf4SFabien Sanglard       last_frame_consumed_ >= last_key_frame_received_ &&
89*3f982cf4SFabien Sanglard       !rtcp_builder_.is_picture_loss_indicator_set()) {
90*3f982cf4SFabien Sanglard     rtcp_builder_.SetPictureLossIndicator(true);
91*3f982cf4SFabien Sanglard     SendRtcp();
92*3f982cf4SFabien Sanglard   }
93*3f982cf4SFabien Sanglard }
94*3f982cf4SFabien Sanglard 
AdvanceToNextFrame()95*3f982cf4SFabien Sanglard int Receiver::AdvanceToNextFrame() {
96*3f982cf4SFabien Sanglard   TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
97*3f982cf4SFabien Sanglard   const FrameId immediate_next_frame = last_frame_consumed_ + 1;
98*3f982cf4SFabien Sanglard 
99*3f982cf4SFabien Sanglard   // Scan the queue for the next frame that should be consumed. Typically, this
100*3f982cf4SFabien Sanglard   // is the very next frame; but if it is incomplete and already late for
101*3f982cf4SFabien Sanglard   // playout, consider skipping-ahead.
102*3f982cf4SFabien Sanglard   for (FrameId f = immediate_next_frame; f <= latest_frame_expected_; ++f) {
103*3f982cf4SFabien Sanglard     PendingFrame& entry = GetQueueEntry(f);
104*3f982cf4SFabien Sanglard     if (entry.collector.is_complete()) {
105*3f982cf4SFabien Sanglard       const EncryptedFrame& encrypted_frame =
106*3f982cf4SFabien Sanglard           entry.collector.PeekAtAssembledFrame();
107*3f982cf4SFabien Sanglard       if (f == immediate_next_frame) {  // Typical case.
108*3f982cf4SFabien Sanglard         return FrameCrypto::GetPlaintextSize(encrypted_frame);
109*3f982cf4SFabien Sanglard       }
110*3f982cf4SFabien Sanglard       if (encrypted_frame.dependency != EncodedFrame::DEPENDS_ON_ANOTHER) {
111*3f982cf4SFabien Sanglard         // Found a frame after skipping past some frames. Drop the ones being
112*3f982cf4SFabien Sanglard         // skipped, advancing |last_frame_consumed_| before returning.
113*3f982cf4SFabien Sanglard         DropAllFramesBefore(f);
114*3f982cf4SFabien Sanglard         return FrameCrypto::GetPlaintextSize(encrypted_frame);
115*3f982cf4SFabien Sanglard       }
116*3f982cf4SFabien Sanglard       // Conclusion: The frame in the current queue entry is complete, but
117*3f982cf4SFabien Sanglard       // depends on a prior incomplete frame. Continue scanning...
118*3f982cf4SFabien Sanglard     }
119*3f982cf4SFabien Sanglard 
120*3f982cf4SFabien Sanglard     // Do not consider skipping past this frame if its estimated capture time is
121*3f982cf4SFabien Sanglard     // unknown. The implication here is that, if |estimated_capture_time| is
122*3f982cf4SFabien Sanglard     // set, the Receiver also knows whether any target playout delay changes
123*3f982cf4SFabien Sanglard     // were communicated from the Sender in the frame's first RTP packet.
124*3f982cf4SFabien Sanglard     if (!entry.estimated_capture_time) {
125*3f982cf4SFabien Sanglard       break;
126*3f982cf4SFabien Sanglard     }
127*3f982cf4SFabien Sanglard 
128*3f982cf4SFabien Sanglard     // If this incomplete frame is not yet late for playout, simply wait for the
129*3f982cf4SFabien Sanglard     // rest of its packets to come in. However, do schedule a check to
130*3f982cf4SFabien Sanglard     // re-examine things at the time it would become a late frame, to possibly
131*3f982cf4SFabien Sanglard     // skip-over it.
132*3f982cf4SFabien Sanglard     const auto playout_time =
133*3f982cf4SFabien Sanglard         *entry.estimated_capture_time + ResolveTargetPlayoutDelay(f);
134*3f982cf4SFabien Sanglard     if (playout_time > (now_() + player_processing_time_)) {
135*3f982cf4SFabien Sanglard       ScheduleFrameReadyCheck(playout_time);
136*3f982cf4SFabien Sanglard       break;
137*3f982cf4SFabien Sanglard     }
138*3f982cf4SFabien Sanglard   }
139*3f982cf4SFabien Sanglard 
140*3f982cf4SFabien Sanglard   return kNoFramesReady;
141*3f982cf4SFabien Sanglard }
142*3f982cf4SFabien Sanglard 
ConsumeNextFrame(absl::Span<uint8_t> buffer)143*3f982cf4SFabien Sanglard EncodedFrame Receiver::ConsumeNextFrame(absl::Span<uint8_t> buffer) {
144*3f982cf4SFabien Sanglard   TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
145*3f982cf4SFabien Sanglard   // Assumption: The required call to AdvanceToNextFrame() ensures that
146*3f982cf4SFabien Sanglard   // |last_frame_consumed_| is set to one before the frame to be consumed here.
147*3f982cf4SFabien Sanglard   const FrameId frame_id = last_frame_consumed_ + 1;
148*3f982cf4SFabien Sanglard   OSP_CHECK_LE(frame_id, checkpoint_frame());
149*3f982cf4SFabien Sanglard 
150*3f982cf4SFabien Sanglard   // Decrypt the frame, populating the given output |frame|.
151*3f982cf4SFabien Sanglard   PendingFrame& entry = GetQueueEntry(frame_id);
152*3f982cf4SFabien Sanglard   OSP_DCHECK(entry.collector.is_complete());
153*3f982cf4SFabien Sanglard   EncodedFrame frame;
154*3f982cf4SFabien Sanglard   frame.data = buffer;
155*3f982cf4SFabien Sanglard   crypto_.Decrypt(entry.collector.PeekAtAssembledFrame(), &frame);
156*3f982cf4SFabien Sanglard   OSP_DCHECK(entry.estimated_capture_time);
157*3f982cf4SFabien Sanglard   frame.reference_time =
158*3f982cf4SFabien Sanglard       *entry.estimated_capture_time + ResolveTargetPlayoutDelay(frame_id);
159*3f982cf4SFabien Sanglard 
160*3f982cf4SFabien Sanglard   OSP_VLOG << "ConsumeNextFrame → " << frame.frame_id << ": "
161*3f982cf4SFabien Sanglard            << frame.data.size() << " payload bytes, RTP Timestamp "
162*3f982cf4SFabien Sanglard            << frame.rtp_timestamp.ToTimeSinceOrigin<microseconds>(rtp_timebase_)
163*3f982cf4SFabien Sanglard                   .count()
164*3f982cf4SFabien Sanglard            << " µs, to play-out "
165*3f982cf4SFabien Sanglard            << to_microseconds(frame.reference_time - now_()).count()
166*3f982cf4SFabien Sanglard            << " µs from now.";
167*3f982cf4SFabien Sanglard 
168*3f982cf4SFabien Sanglard   entry.Reset();
169*3f982cf4SFabien Sanglard   last_frame_consumed_ = frame_id;
170*3f982cf4SFabien Sanglard 
171*3f982cf4SFabien Sanglard   // Ensure the Consumer is notified if there are already more frames ready for
172*3f982cf4SFabien Sanglard   // consumption, and it hasn't explicitly called AdvanceToNextFrame() to check
173*3f982cf4SFabien Sanglard   // for itself.
174*3f982cf4SFabien Sanglard   ScheduleFrameReadyCheck();
175*3f982cf4SFabien Sanglard 
176*3f982cf4SFabien Sanglard   return frame;
177*3f982cf4SFabien Sanglard }
178*3f982cf4SFabien Sanglard 
OnReceivedRtpPacket(Clock::time_point arrival_time,std::vector<uint8_t> packet)179*3f982cf4SFabien Sanglard void Receiver::OnReceivedRtpPacket(Clock::time_point arrival_time,
180*3f982cf4SFabien Sanglard                                    std::vector<uint8_t> packet) {
181*3f982cf4SFabien Sanglard   const absl::optional<RtpPacketParser::ParseResult> part =
182*3f982cf4SFabien Sanglard       rtp_parser_.Parse(packet);
183*3f982cf4SFabien Sanglard   if (!part) {
184*3f982cf4SFabien Sanglard     RECEIVER_LOG(WARN) << "Parsing of " << packet.size()
185*3f982cf4SFabien Sanglard                        << " bytes as an RTP packet failed.";
186*3f982cf4SFabien Sanglard     return;
187*3f982cf4SFabien Sanglard   }
188*3f982cf4SFabien Sanglard   stats_tracker_.OnReceivedValidRtpPacket(part->sequence_number,
189*3f982cf4SFabien Sanglard                                           part->rtp_timestamp, arrival_time);
190*3f982cf4SFabien Sanglard 
191*3f982cf4SFabien Sanglard   // Ignore packets for frames the Receiver is no longer interested in.
192*3f982cf4SFabien Sanglard   if (part->frame_id <= checkpoint_frame()) {
193*3f982cf4SFabien Sanglard     return;
194*3f982cf4SFabien Sanglard   }
195*3f982cf4SFabien Sanglard 
196*3f982cf4SFabien Sanglard   // Extend the range of frames known to this Receiver, within the capacity of
197*3f982cf4SFabien Sanglard   // this Receiver's queue. Prepare the FrameCollectors to receive any
198*3f982cf4SFabien Sanglard   // newly-discovered frames.
199*3f982cf4SFabien Sanglard   if (part->frame_id > latest_frame_expected_) {
200*3f982cf4SFabien Sanglard     const FrameId max_allowed_frame_id =
201*3f982cf4SFabien Sanglard         last_frame_consumed_ + kMaxUnackedFrames;
202*3f982cf4SFabien Sanglard     if (part->frame_id > max_allowed_frame_id) {
203*3f982cf4SFabien Sanglard       return;
204*3f982cf4SFabien Sanglard     }
205*3f982cf4SFabien Sanglard     do {
206*3f982cf4SFabien Sanglard       ++latest_frame_expected_;
207*3f982cf4SFabien Sanglard       GetQueueEntry(latest_frame_expected_)
208*3f982cf4SFabien Sanglard           .collector.set_frame_id(latest_frame_expected_);
209*3f982cf4SFabien Sanglard     } while (latest_frame_expected_ < part->frame_id);
210*3f982cf4SFabien Sanglard   }
211*3f982cf4SFabien Sanglard 
212*3f982cf4SFabien Sanglard   // Start-up edge case: Blatantly drop the first packet of all frames until the
213*3f982cf4SFabien Sanglard   // Receiver has processed at least one Sender Report containing the necessary
214*3f982cf4SFabien Sanglard   // clock-drift and lip-sync information (see OnReceivedRtcpPacket()). This is
215*3f982cf4SFabien Sanglard   // an inescapable data dependency. Note that this special case should almost
216*3f982cf4SFabien Sanglard   // never trigger, since a well-behaving Sender will send the first Sender
217*3f982cf4SFabien Sanglard   // Report RTCP packet before any of the RTP packets.
218*3f982cf4SFabien Sanglard   if (!last_sender_report_ && part->packet_id == FramePacketId{0}) {
219*3f982cf4SFabien Sanglard     RECEIVER_LOG(WARN) << "Dropping packet 0 of frame " << part->frame_id
220*3f982cf4SFabien Sanglard                        << " because it arrived before the first Sender Report.";
221*3f982cf4SFabien Sanglard     // Note: The Sender will have to re-transmit this dropped packet after the
222*3f982cf4SFabien Sanglard     // Sender Report to allow the Receiver to move forward.
223*3f982cf4SFabien Sanglard     return;
224*3f982cf4SFabien Sanglard   }
225*3f982cf4SFabien Sanglard 
226*3f982cf4SFabien Sanglard   PendingFrame& pending_frame = GetQueueEntry(part->frame_id);
227*3f982cf4SFabien Sanglard   FrameCollector& collector = pending_frame.collector;
228*3f982cf4SFabien Sanglard   if (collector.is_complete()) {
229*3f982cf4SFabien Sanglard     // An extra, redundant |packet| was received. Do nothing since the frame was
230*3f982cf4SFabien Sanglard     // already complete.
231*3f982cf4SFabien Sanglard     return;
232*3f982cf4SFabien Sanglard   }
233*3f982cf4SFabien Sanglard 
234*3f982cf4SFabien Sanglard   if (!collector.CollectRtpPacket(*part, &packet)) {
235*3f982cf4SFabien Sanglard     return;  // Bad data in the parsed packet. Ignore it.
236*3f982cf4SFabien Sanglard   }
237*3f982cf4SFabien Sanglard 
238*3f982cf4SFabien Sanglard   // The first packet in a frame contains timing information critical for
239*3f982cf4SFabien Sanglard   // computing this frame's (and all future frames') playout time. Process that,
240*3f982cf4SFabien Sanglard   // but only once.
241*3f982cf4SFabien Sanglard   if (part->packet_id == FramePacketId{0} &&
242*3f982cf4SFabien Sanglard       !pending_frame.estimated_capture_time) {
243*3f982cf4SFabien Sanglard     // Estimate the original capture time of this frame (at the Sender), in
244*3f982cf4SFabien Sanglard     // terms of the Receiver's clock: First, start with a reference time point
245*3f982cf4SFabien Sanglard     // from the Sender's clock (the one from the last Sender Report). Then,
246*3f982cf4SFabien Sanglard     // translate it into the equivalent reference time point in terms of the
247*3f982cf4SFabien Sanglard     // Receiver's clock by applying the measured offset between the two clocks.
248*3f982cf4SFabien Sanglard     // Finally, apply the RTP timestamp difference between the Sender Report and
249*3f982cf4SFabien Sanglard     // this frame to determine what the original capture time of this frame was.
250*3f982cf4SFabien Sanglard     pending_frame.estimated_capture_time =
251*3f982cf4SFabien Sanglard         last_sender_report_->reference_time + smoothed_clock_offset_.Current() +
252*3f982cf4SFabien Sanglard         (part->rtp_timestamp - last_sender_report_->rtp_timestamp)
253*3f982cf4SFabien Sanglard             .ToDuration<Clock::duration>(rtp_timebase_);
254*3f982cf4SFabien Sanglard 
255*3f982cf4SFabien Sanglard     // If a target playout delay change was included in this packet, record it.
256*3f982cf4SFabien Sanglard     if (part->new_playout_delay > milliseconds::zero()) {
257*3f982cf4SFabien Sanglard       RecordNewTargetPlayoutDelay(part->frame_id, part->new_playout_delay);
258*3f982cf4SFabien Sanglard     }
259*3f982cf4SFabien Sanglard 
260*3f982cf4SFabien Sanglard     // Now that the estimated capture time is known, other frames may have just
261*3f982cf4SFabien Sanglard     // become ready, per the frame-skipping logic in AdvanceToNextFrame().
262*3f982cf4SFabien Sanglard     ScheduleFrameReadyCheck();
263*3f982cf4SFabien Sanglard   }
264*3f982cf4SFabien Sanglard 
265*3f982cf4SFabien Sanglard   if (!collector.is_complete()) {
266*3f982cf4SFabien Sanglard     return;  // Wait for the rest of the packets to come in.
267*3f982cf4SFabien Sanglard   }
268*3f982cf4SFabien Sanglard   const EncryptedFrame& encrypted_frame = collector.PeekAtAssembledFrame();
269*3f982cf4SFabien Sanglard 
270*3f982cf4SFabien Sanglard   // Whenever a key frame has been received, the decoder has what it needs to
271*3f982cf4SFabien Sanglard   // recover. In this case, clear the PLI condition.
272*3f982cf4SFabien Sanglard   if (encrypted_frame.dependency == EncryptedFrame::KEY_FRAME) {
273*3f982cf4SFabien Sanglard     rtcp_builder_.SetPictureLossIndicator(false);
274*3f982cf4SFabien Sanglard     last_key_frame_received_ = part->frame_id;
275*3f982cf4SFabien Sanglard   }
276*3f982cf4SFabien Sanglard 
277*3f982cf4SFabien Sanglard   // If this just-completed frame is the one right after the checkpoint frame,
278*3f982cf4SFabien Sanglard   // advance the checkpoint forward.
279*3f982cf4SFabien Sanglard   if (part->frame_id == (checkpoint_frame() + 1)) {
280*3f982cf4SFabien Sanglard     AdvanceCheckpoint(part->frame_id);
281*3f982cf4SFabien Sanglard   }
282*3f982cf4SFabien Sanglard 
283*3f982cf4SFabien Sanglard   // Since a frame has become complete, schedule a check to see whether this or
284*3f982cf4SFabien Sanglard   // any other frames have become ready for consumption.
285*3f982cf4SFabien Sanglard   ScheduleFrameReadyCheck();
286*3f982cf4SFabien Sanglard }
287*3f982cf4SFabien Sanglard 
OnReceivedRtcpPacket(Clock::time_point arrival_time,std::vector<uint8_t> packet)288*3f982cf4SFabien Sanglard void Receiver::OnReceivedRtcpPacket(Clock::time_point arrival_time,
289*3f982cf4SFabien Sanglard                                     std::vector<uint8_t> packet) {
290*3f982cf4SFabien Sanglard   TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
291*3f982cf4SFabien Sanglard   absl::optional<SenderReportParser::SenderReportWithId> parsed_report =
292*3f982cf4SFabien Sanglard       rtcp_parser_.Parse(packet);
293*3f982cf4SFabien Sanglard   if (!parsed_report) {
294*3f982cf4SFabien Sanglard     RECEIVER_LOG(WARN) << "Parsing of " << packet.size()
295*3f982cf4SFabien Sanglard                        << " bytes as an RTCP packet failed.";
296*3f982cf4SFabien Sanglard     return;
297*3f982cf4SFabien Sanglard   }
298*3f982cf4SFabien Sanglard   last_sender_report_ = std::move(parsed_report);
299*3f982cf4SFabien Sanglard   last_sender_report_arrival_time_ = arrival_time;
300*3f982cf4SFabien Sanglard 
301*3f982cf4SFabien Sanglard   // Measure the offset between the Sender's clock and the Receiver's Clock.
302*3f982cf4SFabien Sanglard   // This will be used to translate reference timestamps from the Sender into
303*3f982cf4SFabien Sanglard   // timestamps that represent the exact same moment in time at the Receiver.
304*3f982cf4SFabien Sanglard   //
305*3f982cf4SFabien Sanglard   // Note: Due to design limitations in the Cast Streaming spec, the Receiver
306*3f982cf4SFabien Sanglard   // has no way to compute how long it took the Sender Report to travel over the
307*3f982cf4SFabien Sanglard   // network. The calculation here just ignores that, and so the
308*3f982cf4SFabien Sanglard   // |measured_offset| below will be larger than the true value by that amount.
309*3f982cf4SFabien Sanglard   // This will have the effect of a later-than-configured playout delay.
310*3f982cf4SFabien Sanglard   const Clock::duration measured_offset =
311*3f982cf4SFabien Sanglard       arrival_time - last_sender_report_->reference_time;
312*3f982cf4SFabien Sanglard   smoothed_clock_offset_.Update(arrival_time, measured_offset);
313*3f982cf4SFabien Sanglard 
314*3f982cf4SFabien Sanglard   RtcpReportBlock report;
315*3f982cf4SFabien Sanglard   report.ssrc = rtcp_session_.sender_ssrc();
316*3f982cf4SFabien Sanglard   stats_tracker_.PopulateNextReport(&report);
317*3f982cf4SFabien Sanglard   report.last_status_report_id = last_sender_report_->report_id;
318*3f982cf4SFabien Sanglard   report.SetDelaySinceLastReport(now_() - last_sender_report_arrival_time_);
319*3f982cf4SFabien Sanglard   rtcp_builder_.IncludeReceiverReportInNextPacket(report);
320*3f982cf4SFabien Sanglard 
321*3f982cf4SFabien Sanglard   SendRtcp();
322*3f982cf4SFabien Sanglard }
323*3f982cf4SFabien Sanglard 
SendRtcp()324*3f982cf4SFabien Sanglard void Receiver::SendRtcp() {
325*3f982cf4SFabien Sanglard   // Collect ACK/NACK feedback for all active frames in the queue.
326*3f982cf4SFabien Sanglard   std::vector<PacketNack> packet_nacks;
327*3f982cf4SFabien Sanglard   std::vector<FrameId> frame_acks;
328*3f982cf4SFabien Sanglard   for (FrameId f = checkpoint_frame() + 1; f <= latest_frame_expected_; ++f) {
329*3f982cf4SFabien Sanglard     const FrameCollector& collector = GetQueueEntry(f).collector;
330*3f982cf4SFabien Sanglard     if (collector.is_complete()) {
331*3f982cf4SFabien Sanglard       frame_acks.push_back(f);
332*3f982cf4SFabien Sanglard     } else {
333*3f982cf4SFabien Sanglard       collector.GetMissingPackets(&packet_nacks);
334*3f982cf4SFabien Sanglard     }
335*3f982cf4SFabien Sanglard   }
336*3f982cf4SFabien Sanglard 
337*3f982cf4SFabien Sanglard   // Build and send a compound RTCP packet.
338*3f982cf4SFabien Sanglard   const bool no_nacks = packet_nacks.empty();
339*3f982cf4SFabien Sanglard   rtcp_builder_.IncludeFeedbackInNextPacket(std::move(packet_nacks),
340*3f982cf4SFabien Sanglard                                             std::move(frame_acks));
341*3f982cf4SFabien Sanglard   last_rtcp_send_time_ = now_();
342*3f982cf4SFabien Sanglard   packet_router_->SendRtcpPacket(rtcp_builder_.BuildPacket(
343*3f982cf4SFabien Sanglard       last_rtcp_send_time_,
344*3f982cf4SFabien Sanglard       absl::Span<uint8_t>(rtcp_buffer_.get(), rtcp_buffer_capacity_)));
345*3f982cf4SFabien Sanglard 
346*3f982cf4SFabien Sanglard   // Schedule the automatic sending of another RTCP packet, if this method is
347*3f982cf4SFabien Sanglard   // not called within some bounded amount of time. While incomplete frames
348*3f982cf4SFabien Sanglard   // exist in the queue, send RTCP packets (with ACK/NACK feedback) frequently.
349*3f982cf4SFabien Sanglard   // When there are no incomplete frames, use a longer "keepalive" interval.
350*3f982cf4SFabien Sanglard   const Clock::duration interval =
351*3f982cf4SFabien Sanglard       (no_nacks ? kRtcpReportInterval : kNackFeedbackInterval);
352*3f982cf4SFabien Sanglard   rtcp_alarm_.Schedule([this] { SendRtcp(); }, last_rtcp_send_time_ + interval);
353*3f982cf4SFabien Sanglard }
354*3f982cf4SFabien Sanglard 
GetQueueEntry(FrameId frame_id) const355*3f982cf4SFabien Sanglard const Receiver::PendingFrame& Receiver::GetQueueEntry(FrameId frame_id) const {
356*3f982cf4SFabien Sanglard   return const_cast<Receiver*>(this)->GetQueueEntry(frame_id);
357*3f982cf4SFabien Sanglard }
358*3f982cf4SFabien Sanglard 
GetQueueEntry(FrameId frame_id)359*3f982cf4SFabien Sanglard Receiver::PendingFrame& Receiver::GetQueueEntry(FrameId frame_id) {
360*3f982cf4SFabien Sanglard   return pending_frames_[(frame_id - FrameId::first()) %
361*3f982cf4SFabien Sanglard                          pending_frames_.size()];
362*3f982cf4SFabien Sanglard }
363*3f982cf4SFabien Sanglard 
RecordNewTargetPlayoutDelay(FrameId as_of_frame,milliseconds delay)364*3f982cf4SFabien Sanglard void Receiver::RecordNewTargetPlayoutDelay(FrameId as_of_frame,
365*3f982cf4SFabien Sanglard                                            milliseconds delay) {
366*3f982cf4SFabien Sanglard   OSP_DCHECK_GT(as_of_frame, checkpoint_frame());
367*3f982cf4SFabien Sanglard 
368*3f982cf4SFabien Sanglard   // Prune-out entries from |playout_delay_changes_| that are no longer needed.
369*3f982cf4SFabien Sanglard   // At least one entry must always be kept (i.e., there must always be a
370*3f982cf4SFabien Sanglard   // "current" setting).
371*3f982cf4SFabien Sanglard   const FrameId next_frame = last_frame_consumed_ + 1;
372*3f982cf4SFabien Sanglard   const auto keep_one_before_it = std::find_if(
373*3f982cf4SFabien Sanglard       std::next(playout_delay_changes_.begin()), playout_delay_changes_.end(),
374*3f982cf4SFabien Sanglard       [&](const auto& entry) { return entry.first > next_frame; });
375*3f982cf4SFabien Sanglard   playout_delay_changes_.erase(playout_delay_changes_.begin(),
376*3f982cf4SFabien Sanglard                                std::prev(keep_one_before_it));
377*3f982cf4SFabien Sanglard 
378*3f982cf4SFabien Sanglard   // Insert the delay change entry, maintaining the ascending ordering of the
379*3f982cf4SFabien Sanglard   // vector.
380*3f982cf4SFabien Sanglard   const auto insert_it = std::find_if(
381*3f982cf4SFabien Sanglard       playout_delay_changes_.begin(), playout_delay_changes_.end(),
382*3f982cf4SFabien Sanglard       [&](const auto& entry) { return entry.first > as_of_frame; });
383*3f982cf4SFabien Sanglard   playout_delay_changes_.emplace(insert_it, as_of_frame, delay);
384*3f982cf4SFabien Sanglard 
385*3f982cf4SFabien Sanglard   OSP_DCHECK(AreElementsSortedAndUnique(playout_delay_changes_));
386*3f982cf4SFabien Sanglard }
387*3f982cf4SFabien Sanglard 
ResolveTargetPlayoutDelay(FrameId frame_id) const388*3f982cf4SFabien Sanglard milliseconds Receiver::ResolveTargetPlayoutDelay(FrameId frame_id) const {
389*3f982cf4SFabien Sanglard   OSP_DCHECK_GT(frame_id, last_frame_consumed_);
390*3f982cf4SFabien Sanglard 
391*3f982cf4SFabien Sanglard #if OSP_DCHECK_IS_ON()
392*3f982cf4SFabien Sanglard   // Extra precaution: Ensure all possible playout delay changes are known. In
393*3f982cf4SFabien Sanglard   // other words, every unconsumed frame in the queue, up to (and including)
394*3f982cf4SFabien Sanglard   // |frame_id|, must have an assigned estimated_capture_time.
395*3f982cf4SFabien Sanglard   for (FrameId f = last_frame_consumed_ + 1; f <= frame_id; ++f) {
396*3f982cf4SFabien Sanglard     OSP_DCHECK(GetQueueEntry(f).estimated_capture_time)
397*3f982cf4SFabien Sanglard         << " don't know whether there was a playout delay change for frame "
398*3f982cf4SFabien Sanglard         << f;
399*3f982cf4SFabien Sanglard   }
400*3f982cf4SFabien Sanglard #endif
401*3f982cf4SFabien Sanglard 
402*3f982cf4SFabien Sanglard   const auto it = std::find_if(
403*3f982cf4SFabien Sanglard       playout_delay_changes_.crbegin(), playout_delay_changes_.crend(),
404*3f982cf4SFabien Sanglard       [&](const auto& entry) { return entry.first <= frame_id; });
405*3f982cf4SFabien Sanglard   OSP_DCHECK(it != playout_delay_changes_.crend());
406*3f982cf4SFabien Sanglard   return it->second;
407*3f982cf4SFabien Sanglard }
408*3f982cf4SFabien Sanglard 
AdvanceCheckpoint(FrameId new_checkpoint)409*3f982cf4SFabien Sanglard void Receiver::AdvanceCheckpoint(FrameId new_checkpoint) {
410*3f982cf4SFabien Sanglard   TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
411*3f982cf4SFabien Sanglard   OSP_DCHECK_GT(new_checkpoint, checkpoint_frame());
412*3f982cf4SFabien Sanglard   OSP_DCHECK_LE(new_checkpoint, latest_frame_expected_);
413*3f982cf4SFabien Sanglard 
414*3f982cf4SFabien Sanglard   while (new_checkpoint < latest_frame_expected_) {
415*3f982cf4SFabien Sanglard     const FrameId next = new_checkpoint + 1;
416*3f982cf4SFabien Sanglard     if (!GetQueueEntry(next).collector.is_complete()) {
417*3f982cf4SFabien Sanglard       break;
418*3f982cf4SFabien Sanglard     }
419*3f982cf4SFabien Sanglard     new_checkpoint = next;
420*3f982cf4SFabien Sanglard   }
421*3f982cf4SFabien Sanglard 
422*3f982cf4SFabien Sanglard   set_checkpoint_frame(new_checkpoint);
423*3f982cf4SFabien Sanglard   rtcp_builder_.SetPlayoutDelay(ResolveTargetPlayoutDelay(new_checkpoint));
424*3f982cf4SFabien Sanglard   SendRtcp();
425*3f982cf4SFabien Sanglard }
426*3f982cf4SFabien Sanglard 
DropAllFramesBefore(FrameId first_kept_frame)427*3f982cf4SFabien Sanglard void Receiver::DropAllFramesBefore(FrameId first_kept_frame) {
428*3f982cf4SFabien Sanglard   // The following DCHECKs are verifying that this method is only being called
429*3f982cf4SFabien Sanglard   // because one or more incomplete frames are being skipped-over.
430*3f982cf4SFabien Sanglard   const FrameId first_to_drop = last_frame_consumed_ + 1;
431*3f982cf4SFabien Sanglard   OSP_DCHECK_GT(first_kept_frame, first_to_drop);
432*3f982cf4SFabien Sanglard   OSP_DCHECK_GT(first_kept_frame, checkpoint_frame());
433*3f982cf4SFabien Sanglard   OSP_DCHECK_LE(first_kept_frame, latest_frame_expected_);
434*3f982cf4SFabien Sanglard 
435*3f982cf4SFabien Sanglard   // Reset each of the frames being dropped, pretending that they were consumed.
436*3f982cf4SFabien Sanglard   for (FrameId f = first_to_drop; f < first_kept_frame; ++f) {
437*3f982cf4SFabien Sanglard     PendingFrame& entry = GetQueueEntry(f);
438*3f982cf4SFabien Sanglard     // Pedantic sanity-check: Ensure the "target playout delay change" data
439*3f982cf4SFabien Sanglard     // dependency was satisfied. See comments in AdvanceToNextFrame().
440*3f982cf4SFabien Sanglard     OSP_DCHECK(entry.estimated_capture_time);
441*3f982cf4SFabien Sanglard     entry.Reset();
442*3f982cf4SFabien Sanglard   }
443*3f982cf4SFabien Sanglard   last_frame_consumed_ = first_kept_frame - 1;
444*3f982cf4SFabien Sanglard 
445*3f982cf4SFabien Sanglard   RECEIVER_LOG(INFO) << "Artificially advancing checkpoint after skipping.";
446*3f982cf4SFabien Sanglard   AdvanceCheckpoint(first_kept_frame);
447*3f982cf4SFabien Sanglard }
448*3f982cf4SFabien Sanglard 
ScheduleFrameReadyCheck(Clock::time_point when)449*3f982cf4SFabien Sanglard void Receiver::ScheduleFrameReadyCheck(Clock::time_point when) {
450*3f982cf4SFabien Sanglard   consumption_alarm_.Schedule(
451*3f982cf4SFabien Sanglard       [this] {
452*3f982cf4SFabien Sanglard         if (consumer_) {
453*3f982cf4SFabien Sanglard           const int next_frame_buffer_size = AdvanceToNextFrame();
454*3f982cf4SFabien Sanglard           if (next_frame_buffer_size != kNoFramesReady) {
455*3f982cf4SFabien Sanglard             consumer_->OnFramesReady(next_frame_buffer_size);
456*3f982cf4SFabien Sanglard           }
457*3f982cf4SFabien Sanglard         }
458*3f982cf4SFabien Sanglard       },
459*3f982cf4SFabien Sanglard       when);
460*3f982cf4SFabien Sanglard }
461*3f982cf4SFabien Sanglard 
462*3f982cf4SFabien Sanglard Receiver::PendingFrame::PendingFrame() = default;
463*3f982cf4SFabien Sanglard Receiver::PendingFrame::~PendingFrame() = default;
464*3f982cf4SFabien Sanglard 
Reset()465*3f982cf4SFabien Sanglard void Receiver::PendingFrame::Reset() {
466*3f982cf4SFabien Sanglard   collector.Reset();
467*3f982cf4SFabien Sanglard   estimated_capture_time = absl::nullopt;
468*3f982cf4SFabien Sanglard }
469*3f982cf4SFabien Sanglard 
470*3f982cf4SFabien Sanglard // static
471*3f982cf4SFabien Sanglard constexpr milliseconds Receiver::kDefaultPlayerProcessingTime;
472*3f982cf4SFabien Sanglard constexpr int Receiver::kNoFramesReady;
473*3f982cf4SFabien Sanglard constexpr milliseconds Receiver::kNackFeedbackInterval;
474*3f982cf4SFabien Sanglard 
475*3f982cf4SFabien Sanglard }  // namespace cast
476*3f982cf4SFabien Sanglard }  // namespace openscreen
477