1*3f982cf4SFabien Sanglard // Copyright 2019 The Chromium Authors. All rights reserved.
2*3f982cf4SFabien Sanglard // Use of this source code is governed by a BSD-style license that can be
3*3f982cf4SFabien Sanglard // found in the LICENSE file.
4*3f982cf4SFabien Sanglard
5*3f982cf4SFabien Sanglard #include "cast/streaming/receiver.h"
6*3f982cf4SFabien Sanglard
7*3f982cf4SFabien Sanglard #include <algorithm>
8*3f982cf4SFabien Sanglard #include <utility>
9*3f982cf4SFabien Sanglard
10*3f982cf4SFabien Sanglard #include "absl/types/span.h"
11*3f982cf4SFabien Sanglard #include "cast/streaming/constants.h"
12*3f982cf4SFabien Sanglard #include "cast/streaming/receiver_packet_router.h"
13*3f982cf4SFabien Sanglard #include "cast/streaming/session_config.h"
14*3f982cf4SFabien Sanglard #include "util/chrono_helpers.h"
15*3f982cf4SFabien Sanglard #include "util/osp_logging.h"
16*3f982cf4SFabien Sanglard #include "util/std_util.h"
17*3f982cf4SFabien Sanglard #include "util/trace_logging.h"
18*3f982cf4SFabien Sanglard
19*3f982cf4SFabien Sanglard namespace openscreen {
20*3f982cf4SFabien Sanglard namespace cast {
21*3f982cf4SFabien Sanglard
22*3f982cf4SFabien Sanglard // Conveniences for ensuring logging output includes the SSRC of the Receiver,
23*3f982cf4SFabien Sanglard // to help distinguish one out of multiple instances in a Cast Streaming
24*3f982cf4SFabien Sanglard // session.
25*3f982cf4SFabien Sanglard //
26*3f982cf4SFabien Sanglard #define RECEIVER_LOG(level) OSP_LOG_##level << "[SSRC:" << ssrc() << "] "
27*3f982cf4SFabien Sanglard
Receiver(Environment * environment,ReceiverPacketRouter * packet_router,SessionConfig config)28*3f982cf4SFabien Sanglard Receiver::Receiver(Environment* environment,
29*3f982cf4SFabien Sanglard ReceiverPacketRouter* packet_router,
30*3f982cf4SFabien Sanglard SessionConfig config)
31*3f982cf4SFabien Sanglard : now_(environment->now_function()),
32*3f982cf4SFabien Sanglard packet_router_(packet_router),
33*3f982cf4SFabien Sanglard config_(config),
34*3f982cf4SFabien Sanglard rtcp_session_(config.sender_ssrc, config.receiver_ssrc, now_()),
35*3f982cf4SFabien Sanglard rtcp_parser_(&rtcp_session_),
36*3f982cf4SFabien Sanglard rtcp_builder_(&rtcp_session_),
37*3f982cf4SFabien Sanglard stats_tracker_(config.rtp_timebase),
38*3f982cf4SFabien Sanglard rtp_parser_(config.sender_ssrc),
39*3f982cf4SFabien Sanglard rtp_timebase_(config.rtp_timebase),
40*3f982cf4SFabien Sanglard crypto_(config.aes_secret_key, config.aes_iv_mask),
41*3f982cf4SFabien Sanglard is_pli_enabled_(config.is_pli_enabled),
42*3f982cf4SFabien Sanglard rtcp_buffer_capacity_(environment->GetMaxPacketSize()),
43*3f982cf4SFabien Sanglard rtcp_buffer_(new uint8_t[rtcp_buffer_capacity_]),
44*3f982cf4SFabien Sanglard rtcp_alarm_(environment->now_function(), environment->task_runner()),
45*3f982cf4SFabien Sanglard smoothed_clock_offset_(ClockDriftSmoother::kDefaultTimeConstant),
46*3f982cf4SFabien Sanglard consumption_alarm_(environment->now_function(),
47*3f982cf4SFabien Sanglard environment->task_runner()) {
48*3f982cf4SFabien Sanglard OSP_DCHECK(packet_router_);
49*3f982cf4SFabien Sanglard OSP_DCHECK_EQ(checkpoint_frame(), FrameId::leader());
50*3f982cf4SFabien Sanglard OSP_CHECK_GT(rtcp_buffer_capacity_, 0);
51*3f982cf4SFabien Sanglard OSP_CHECK(rtcp_buffer_);
52*3f982cf4SFabien Sanglard
53*3f982cf4SFabien Sanglard rtcp_builder_.SetPlayoutDelay(config.target_playout_delay);
54*3f982cf4SFabien Sanglard playout_delay_changes_.emplace_back(FrameId::leader(),
55*3f982cf4SFabien Sanglard config.target_playout_delay);
56*3f982cf4SFabien Sanglard
57*3f982cf4SFabien Sanglard packet_router_->OnReceiverCreated(rtcp_session_.sender_ssrc(), this);
58*3f982cf4SFabien Sanglard }
59*3f982cf4SFabien Sanglard
~Receiver()60*3f982cf4SFabien Sanglard Receiver::~Receiver() {
61*3f982cf4SFabien Sanglard packet_router_->OnReceiverDestroyed(rtcp_session_.sender_ssrc());
62*3f982cf4SFabien Sanglard }
63*3f982cf4SFabien Sanglard
config() const64*3f982cf4SFabien Sanglard const SessionConfig& Receiver::config() const {
65*3f982cf4SFabien Sanglard return config_;
66*3f982cf4SFabien Sanglard }
rtp_timebase() const67*3f982cf4SFabien Sanglard int Receiver::rtp_timebase() const {
68*3f982cf4SFabien Sanglard return rtp_timebase_;
69*3f982cf4SFabien Sanglard }
ssrc() const70*3f982cf4SFabien Sanglard Ssrc Receiver::ssrc() const {
71*3f982cf4SFabien Sanglard return rtcp_session_.receiver_ssrc();
72*3f982cf4SFabien Sanglard }
73*3f982cf4SFabien Sanglard
SetConsumer(Consumer * consumer)74*3f982cf4SFabien Sanglard void Receiver::SetConsumer(Consumer* consumer) {
75*3f982cf4SFabien Sanglard consumer_ = consumer;
76*3f982cf4SFabien Sanglard ScheduleFrameReadyCheck();
77*3f982cf4SFabien Sanglard }
78*3f982cf4SFabien Sanglard
SetPlayerProcessingTime(Clock::duration needed_time)79*3f982cf4SFabien Sanglard void Receiver::SetPlayerProcessingTime(Clock::duration needed_time) {
80*3f982cf4SFabien Sanglard player_processing_time_ = std::max(Clock::duration::zero(), needed_time);
81*3f982cf4SFabien Sanglard }
82*3f982cf4SFabien Sanglard
RequestKeyFrame()83*3f982cf4SFabien Sanglard void Receiver::RequestKeyFrame() {
84*3f982cf4SFabien Sanglard // If we don't have picture loss indication enabled, we should not request
85*3f982cf4SFabien Sanglard // any key frames.
86*3f982cf4SFabien Sanglard OSP_DCHECK(is_pli_enabled_) << "PLI is not enabled.";
87*3f982cf4SFabien Sanglard if (is_pli_enabled_ && !last_key_frame_received_.is_null() &&
88*3f982cf4SFabien Sanglard last_frame_consumed_ >= last_key_frame_received_ &&
89*3f982cf4SFabien Sanglard !rtcp_builder_.is_picture_loss_indicator_set()) {
90*3f982cf4SFabien Sanglard rtcp_builder_.SetPictureLossIndicator(true);
91*3f982cf4SFabien Sanglard SendRtcp();
92*3f982cf4SFabien Sanglard }
93*3f982cf4SFabien Sanglard }
94*3f982cf4SFabien Sanglard
AdvanceToNextFrame()95*3f982cf4SFabien Sanglard int Receiver::AdvanceToNextFrame() {
96*3f982cf4SFabien Sanglard TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
97*3f982cf4SFabien Sanglard const FrameId immediate_next_frame = last_frame_consumed_ + 1;
98*3f982cf4SFabien Sanglard
99*3f982cf4SFabien Sanglard // Scan the queue for the next frame that should be consumed. Typically, this
100*3f982cf4SFabien Sanglard // is the very next frame; but if it is incomplete and already late for
101*3f982cf4SFabien Sanglard // playout, consider skipping-ahead.
102*3f982cf4SFabien Sanglard for (FrameId f = immediate_next_frame; f <= latest_frame_expected_; ++f) {
103*3f982cf4SFabien Sanglard PendingFrame& entry = GetQueueEntry(f);
104*3f982cf4SFabien Sanglard if (entry.collector.is_complete()) {
105*3f982cf4SFabien Sanglard const EncryptedFrame& encrypted_frame =
106*3f982cf4SFabien Sanglard entry.collector.PeekAtAssembledFrame();
107*3f982cf4SFabien Sanglard if (f == immediate_next_frame) { // Typical case.
108*3f982cf4SFabien Sanglard return FrameCrypto::GetPlaintextSize(encrypted_frame);
109*3f982cf4SFabien Sanglard }
110*3f982cf4SFabien Sanglard if (encrypted_frame.dependency != EncodedFrame::DEPENDS_ON_ANOTHER) {
111*3f982cf4SFabien Sanglard // Found a frame after skipping past some frames. Drop the ones being
112*3f982cf4SFabien Sanglard // skipped, advancing |last_frame_consumed_| before returning.
113*3f982cf4SFabien Sanglard DropAllFramesBefore(f);
114*3f982cf4SFabien Sanglard return FrameCrypto::GetPlaintextSize(encrypted_frame);
115*3f982cf4SFabien Sanglard }
116*3f982cf4SFabien Sanglard // Conclusion: The frame in the current queue entry is complete, but
117*3f982cf4SFabien Sanglard // depends on a prior incomplete frame. Continue scanning...
118*3f982cf4SFabien Sanglard }
119*3f982cf4SFabien Sanglard
120*3f982cf4SFabien Sanglard // Do not consider skipping past this frame if its estimated capture time is
121*3f982cf4SFabien Sanglard // unknown. The implication here is that, if |estimated_capture_time| is
122*3f982cf4SFabien Sanglard // set, the Receiver also knows whether any target playout delay changes
123*3f982cf4SFabien Sanglard // were communicated from the Sender in the frame's first RTP packet.
124*3f982cf4SFabien Sanglard if (!entry.estimated_capture_time) {
125*3f982cf4SFabien Sanglard break;
126*3f982cf4SFabien Sanglard }
127*3f982cf4SFabien Sanglard
128*3f982cf4SFabien Sanglard // If this incomplete frame is not yet late for playout, simply wait for the
129*3f982cf4SFabien Sanglard // rest of its packets to come in. However, do schedule a check to
130*3f982cf4SFabien Sanglard // re-examine things at the time it would become a late frame, to possibly
131*3f982cf4SFabien Sanglard // skip-over it.
132*3f982cf4SFabien Sanglard const auto playout_time =
133*3f982cf4SFabien Sanglard *entry.estimated_capture_time + ResolveTargetPlayoutDelay(f);
134*3f982cf4SFabien Sanglard if (playout_time > (now_() + player_processing_time_)) {
135*3f982cf4SFabien Sanglard ScheduleFrameReadyCheck(playout_time);
136*3f982cf4SFabien Sanglard break;
137*3f982cf4SFabien Sanglard }
138*3f982cf4SFabien Sanglard }
139*3f982cf4SFabien Sanglard
140*3f982cf4SFabien Sanglard return kNoFramesReady;
141*3f982cf4SFabien Sanglard }
142*3f982cf4SFabien Sanglard
ConsumeNextFrame(absl::Span<uint8_t> buffer)143*3f982cf4SFabien Sanglard EncodedFrame Receiver::ConsumeNextFrame(absl::Span<uint8_t> buffer) {
144*3f982cf4SFabien Sanglard TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
145*3f982cf4SFabien Sanglard // Assumption: The required call to AdvanceToNextFrame() ensures that
146*3f982cf4SFabien Sanglard // |last_frame_consumed_| is set to one before the frame to be consumed here.
147*3f982cf4SFabien Sanglard const FrameId frame_id = last_frame_consumed_ + 1;
148*3f982cf4SFabien Sanglard OSP_CHECK_LE(frame_id, checkpoint_frame());
149*3f982cf4SFabien Sanglard
150*3f982cf4SFabien Sanglard // Decrypt the frame, populating the given output |frame|.
151*3f982cf4SFabien Sanglard PendingFrame& entry = GetQueueEntry(frame_id);
152*3f982cf4SFabien Sanglard OSP_DCHECK(entry.collector.is_complete());
153*3f982cf4SFabien Sanglard EncodedFrame frame;
154*3f982cf4SFabien Sanglard frame.data = buffer;
155*3f982cf4SFabien Sanglard crypto_.Decrypt(entry.collector.PeekAtAssembledFrame(), &frame);
156*3f982cf4SFabien Sanglard OSP_DCHECK(entry.estimated_capture_time);
157*3f982cf4SFabien Sanglard frame.reference_time =
158*3f982cf4SFabien Sanglard *entry.estimated_capture_time + ResolveTargetPlayoutDelay(frame_id);
159*3f982cf4SFabien Sanglard
160*3f982cf4SFabien Sanglard OSP_VLOG << "ConsumeNextFrame → " << frame.frame_id << ": "
161*3f982cf4SFabien Sanglard << frame.data.size() << " payload bytes, RTP Timestamp "
162*3f982cf4SFabien Sanglard << frame.rtp_timestamp.ToTimeSinceOrigin<microseconds>(rtp_timebase_)
163*3f982cf4SFabien Sanglard .count()
164*3f982cf4SFabien Sanglard << " µs, to play-out "
165*3f982cf4SFabien Sanglard << to_microseconds(frame.reference_time - now_()).count()
166*3f982cf4SFabien Sanglard << " µs from now.";
167*3f982cf4SFabien Sanglard
168*3f982cf4SFabien Sanglard entry.Reset();
169*3f982cf4SFabien Sanglard last_frame_consumed_ = frame_id;
170*3f982cf4SFabien Sanglard
171*3f982cf4SFabien Sanglard // Ensure the Consumer is notified if there are already more frames ready for
172*3f982cf4SFabien Sanglard // consumption, and it hasn't explicitly called AdvanceToNextFrame() to check
173*3f982cf4SFabien Sanglard // for itself.
174*3f982cf4SFabien Sanglard ScheduleFrameReadyCheck();
175*3f982cf4SFabien Sanglard
176*3f982cf4SFabien Sanglard return frame;
177*3f982cf4SFabien Sanglard }
178*3f982cf4SFabien Sanglard
OnReceivedRtpPacket(Clock::time_point arrival_time,std::vector<uint8_t> packet)179*3f982cf4SFabien Sanglard void Receiver::OnReceivedRtpPacket(Clock::time_point arrival_time,
180*3f982cf4SFabien Sanglard std::vector<uint8_t> packet) {
181*3f982cf4SFabien Sanglard const absl::optional<RtpPacketParser::ParseResult> part =
182*3f982cf4SFabien Sanglard rtp_parser_.Parse(packet);
183*3f982cf4SFabien Sanglard if (!part) {
184*3f982cf4SFabien Sanglard RECEIVER_LOG(WARN) << "Parsing of " << packet.size()
185*3f982cf4SFabien Sanglard << " bytes as an RTP packet failed.";
186*3f982cf4SFabien Sanglard return;
187*3f982cf4SFabien Sanglard }
188*3f982cf4SFabien Sanglard stats_tracker_.OnReceivedValidRtpPacket(part->sequence_number,
189*3f982cf4SFabien Sanglard part->rtp_timestamp, arrival_time);
190*3f982cf4SFabien Sanglard
191*3f982cf4SFabien Sanglard // Ignore packets for frames the Receiver is no longer interested in.
192*3f982cf4SFabien Sanglard if (part->frame_id <= checkpoint_frame()) {
193*3f982cf4SFabien Sanglard return;
194*3f982cf4SFabien Sanglard }
195*3f982cf4SFabien Sanglard
196*3f982cf4SFabien Sanglard // Extend the range of frames known to this Receiver, within the capacity of
197*3f982cf4SFabien Sanglard // this Receiver's queue. Prepare the FrameCollectors to receive any
198*3f982cf4SFabien Sanglard // newly-discovered frames.
199*3f982cf4SFabien Sanglard if (part->frame_id > latest_frame_expected_) {
200*3f982cf4SFabien Sanglard const FrameId max_allowed_frame_id =
201*3f982cf4SFabien Sanglard last_frame_consumed_ + kMaxUnackedFrames;
202*3f982cf4SFabien Sanglard if (part->frame_id > max_allowed_frame_id) {
203*3f982cf4SFabien Sanglard return;
204*3f982cf4SFabien Sanglard }
205*3f982cf4SFabien Sanglard do {
206*3f982cf4SFabien Sanglard ++latest_frame_expected_;
207*3f982cf4SFabien Sanglard GetQueueEntry(latest_frame_expected_)
208*3f982cf4SFabien Sanglard .collector.set_frame_id(latest_frame_expected_);
209*3f982cf4SFabien Sanglard } while (latest_frame_expected_ < part->frame_id);
210*3f982cf4SFabien Sanglard }
211*3f982cf4SFabien Sanglard
212*3f982cf4SFabien Sanglard // Start-up edge case: Blatantly drop the first packet of all frames until the
213*3f982cf4SFabien Sanglard // Receiver has processed at least one Sender Report containing the necessary
214*3f982cf4SFabien Sanglard // clock-drift and lip-sync information (see OnReceivedRtcpPacket()). This is
215*3f982cf4SFabien Sanglard // an inescapable data dependency. Note that this special case should almost
216*3f982cf4SFabien Sanglard // never trigger, since a well-behaving Sender will send the first Sender
217*3f982cf4SFabien Sanglard // Report RTCP packet before any of the RTP packets.
218*3f982cf4SFabien Sanglard if (!last_sender_report_ && part->packet_id == FramePacketId{0}) {
219*3f982cf4SFabien Sanglard RECEIVER_LOG(WARN) << "Dropping packet 0 of frame " << part->frame_id
220*3f982cf4SFabien Sanglard << " because it arrived before the first Sender Report.";
221*3f982cf4SFabien Sanglard // Note: The Sender will have to re-transmit this dropped packet after the
222*3f982cf4SFabien Sanglard // Sender Report to allow the Receiver to move forward.
223*3f982cf4SFabien Sanglard return;
224*3f982cf4SFabien Sanglard }
225*3f982cf4SFabien Sanglard
226*3f982cf4SFabien Sanglard PendingFrame& pending_frame = GetQueueEntry(part->frame_id);
227*3f982cf4SFabien Sanglard FrameCollector& collector = pending_frame.collector;
228*3f982cf4SFabien Sanglard if (collector.is_complete()) {
229*3f982cf4SFabien Sanglard // An extra, redundant |packet| was received. Do nothing since the frame was
230*3f982cf4SFabien Sanglard // already complete.
231*3f982cf4SFabien Sanglard return;
232*3f982cf4SFabien Sanglard }
233*3f982cf4SFabien Sanglard
234*3f982cf4SFabien Sanglard if (!collector.CollectRtpPacket(*part, &packet)) {
235*3f982cf4SFabien Sanglard return; // Bad data in the parsed packet. Ignore it.
236*3f982cf4SFabien Sanglard }
237*3f982cf4SFabien Sanglard
238*3f982cf4SFabien Sanglard // The first packet in a frame contains timing information critical for
239*3f982cf4SFabien Sanglard // computing this frame's (and all future frames') playout time. Process that,
240*3f982cf4SFabien Sanglard // but only once.
241*3f982cf4SFabien Sanglard if (part->packet_id == FramePacketId{0} &&
242*3f982cf4SFabien Sanglard !pending_frame.estimated_capture_time) {
243*3f982cf4SFabien Sanglard // Estimate the original capture time of this frame (at the Sender), in
244*3f982cf4SFabien Sanglard // terms of the Receiver's clock: First, start with a reference time point
245*3f982cf4SFabien Sanglard // from the Sender's clock (the one from the last Sender Report). Then,
246*3f982cf4SFabien Sanglard // translate it into the equivalent reference time point in terms of the
247*3f982cf4SFabien Sanglard // Receiver's clock by applying the measured offset between the two clocks.
248*3f982cf4SFabien Sanglard // Finally, apply the RTP timestamp difference between the Sender Report and
249*3f982cf4SFabien Sanglard // this frame to determine what the original capture time of this frame was.
250*3f982cf4SFabien Sanglard pending_frame.estimated_capture_time =
251*3f982cf4SFabien Sanglard last_sender_report_->reference_time + smoothed_clock_offset_.Current() +
252*3f982cf4SFabien Sanglard (part->rtp_timestamp - last_sender_report_->rtp_timestamp)
253*3f982cf4SFabien Sanglard .ToDuration<Clock::duration>(rtp_timebase_);
254*3f982cf4SFabien Sanglard
255*3f982cf4SFabien Sanglard // If a target playout delay change was included in this packet, record it.
256*3f982cf4SFabien Sanglard if (part->new_playout_delay > milliseconds::zero()) {
257*3f982cf4SFabien Sanglard RecordNewTargetPlayoutDelay(part->frame_id, part->new_playout_delay);
258*3f982cf4SFabien Sanglard }
259*3f982cf4SFabien Sanglard
260*3f982cf4SFabien Sanglard // Now that the estimated capture time is known, other frames may have just
261*3f982cf4SFabien Sanglard // become ready, per the frame-skipping logic in AdvanceToNextFrame().
262*3f982cf4SFabien Sanglard ScheduleFrameReadyCheck();
263*3f982cf4SFabien Sanglard }
264*3f982cf4SFabien Sanglard
265*3f982cf4SFabien Sanglard if (!collector.is_complete()) {
266*3f982cf4SFabien Sanglard return; // Wait for the rest of the packets to come in.
267*3f982cf4SFabien Sanglard }
268*3f982cf4SFabien Sanglard const EncryptedFrame& encrypted_frame = collector.PeekAtAssembledFrame();
269*3f982cf4SFabien Sanglard
270*3f982cf4SFabien Sanglard // Whenever a key frame has been received, the decoder has what it needs to
271*3f982cf4SFabien Sanglard // recover. In this case, clear the PLI condition.
272*3f982cf4SFabien Sanglard if (encrypted_frame.dependency == EncryptedFrame::KEY_FRAME) {
273*3f982cf4SFabien Sanglard rtcp_builder_.SetPictureLossIndicator(false);
274*3f982cf4SFabien Sanglard last_key_frame_received_ = part->frame_id;
275*3f982cf4SFabien Sanglard }
276*3f982cf4SFabien Sanglard
277*3f982cf4SFabien Sanglard // If this just-completed frame is the one right after the checkpoint frame,
278*3f982cf4SFabien Sanglard // advance the checkpoint forward.
279*3f982cf4SFabien Sanglard if (part->frame_id == (checkpoint_frame() + 1)) {
280*3f982cf4SFabien Sanglard AdvanceCheckpoint(part->frame_id);
281*3f982cf4SFabien Sanglard }
282*3f982cf4SFabien Sanglard
283*3f982cf4SFabien Sanglard // Since a frame has become complete, schedule a check to see whether this or
284*3f982cf4SFabien Sanglard // any other frames have become ready for consumption.
285*3f982cf4SFabien Sanglard ScheduleFrameReadyCheck();
286*3f982cf4SFabien Sanglard }
287*3f982cf4SFabien Sanglard
OnReceivedRtcpPacket(Clock::time_point arrival_time,std::vector<uint8_t> packet)288*3f982cf4SFabien Sanglard void Receiver::OnReceivedRtcpPacket(Clock::time_point arrival_time,
289*3f982cf4SFabien Sanglard std::vector<uint8_t> packet) {
290*3f982cf4SFabien Sanglard TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
291*3f982cf4SFabien Sanglard absl::optional<SenderReportParser::SenderReportWithId> parsed_report =
292*3f982cf4SFabien Sanglard rtcp_parser_.Parse(packet);
293*3f982cf4SFabien Sanglard if (!parsed_report) {
294*3f982cf4SFabien Sanglard RECEIVER_LOG(WARN) << "Parsing of " << packet.size()
295*3f982cf4SFabien Sanglard << " bytes as an RTCP packet failed.";
296*3f982cf4SFabien Sanglard return;
297*3f982cf4SFabien Sanglard }
298*3f982cf4SFabien Sanglard last_sender_report_ = std::move(parsed_report);
299*3f982cf4SFabien Sanglard last_sender_report_arrival_time_ = arrival_time;
300*3f982cf4SFabien Sanglard
301*3f982cf4SFabien Sanglard // Measure the offset between the Sender's clock and the Receiver's Clock.
302*3f982cf4SFabien Sanglard // This will be used to translate reference timestamps from the Sender into
303*3f982cf4SFabien Sanglard // timestamps that represent the exact same moment in time at the Receiver.
304*3f982cf4SFabien Sanglard //
305*3f982cf4SFabien Sanglard // Note: Due to design limitations in the Cast Streaming spec, the Receiver
306*3f982cf4SFabien Sanglard // has no way to compute how long it took the Sender Report to travel over the
307*3f982cf4SFabien Sanglard // network. The calculation here just ignores that, and so the
308*3f982cf4SFabien Sanglard // |measured_offset| below will be larger than the true value by that amount.
309*3f982cf4SFabien Sanglard // This will have the effect of a later-than-configured playout delay.
310*3f982cf4SFabien Sanglard const Clock::duration measured_offset =
311*3f982cf4SFabien Sanglard arrival_time - last_sender_report_->reference_time;
312*3f982cf4SFabien Sanglard smoothed_clock_offset_.Update(arrival_time, measured_offset);
313*3f982cf4SFabien Sanglard
314*3f982cf4SFabien Sanglard RtcpReportBlock report;
315*3f982cf4SFabien Sanglard report.ssrc = rtcp_session_.sender_ssrc();
316*3f982cf4SFabien Sanglard stats_tracker_.PopulateNextReport(&report);
317*3f982cf4SFabien Sanglard report.last_status_report_id = last_sender_report_->report_id;
318*3f982cf4SFabien Sanglard report.SetDelaySinceLastReport(now_() - last_sender_report_arrival_time_);
319*3f982cf4SFabien Sanglard rtcp_builder_.IncludeReceiverReportInNextPacket(report);
320*3f982cf4SFabien Sanglard
321*3f982cf4SFabien Sanglard SendRtcp();
322*3f982cf4SFabien Sanglard }
323*3f982cf4SFabien Sanglard
SendRtcp()324*3f982cf4SFabien Sanglard void Receiver::SendRtcp() {
325*3f982cf4SFabien Sanglard // Collect ACK/NACK feedback for all active frames in the queue.
326*3f982cf4SFabien Sanglard std::vector<PacketNack> packet_nacks;
327*3f982cf4SFabien Sanglard std::vector<FrameId> frame_acks;
328*3f982cf4SFabien Sanglard for (FrameId f = checkpoint_frame() + 1; f <= latest_frame_expected_; ++f) {
329*3f982cf4SFabien Sanglard const FrameCollector& collector = GetQueueEntry(f).collector;
330*3f982cf4SFabien Sanglard if (collector.is_complete()) {
331*3f982cf4SFabien Sanglard frame_acks.push_back(f);
332*3f982cf4SFabien Sanglard } else {
333*3f982cf4SFabien Sanglard collector.GetMissingPackets(&packet_nacks);
334*3f982cf4SFabien Sanglard }
335*3f982cf4SFabien Sanglard }
336*3f982cf4SFabien Sanglard
337*3f982cf4SFabien Sanglard // Build and send a compound RTCP packet.
338*3f982cf4SFabien Sanglard const bool no_nacks = packet_nacks.empty();
339*3f982cf4SFabien Sanglard rtcp_builder_.IncludeFeedbackInNextPacket(std::move(packet_nacks),
340*3f982cf4SFabien Sanglard std::move(frame_acks));
341*3f982cf4SFabien Sanglard last_rtcp_send_time_ = now_();
342*3f982cf4SFabien Sanglard packet_router_->SendRtcpPacket(rtcp_builder_.BuildPacket(
343*3f982cf4SFabien Sanglard last_rtcp_send_time_,
344*3f982cf4SFabien Sanglard absl::Span<uint8_t>(rtcp_buffer_.get(), rtcp_buffer_capacity_)));
345*3f982cf4SFabien Sanglard
346*3f982cf4SFabien Sanglard // Schedule the automatic sending of another RTCP packet, if this method is
347*3f982cf4SFabien Sanglard // not called within some bounded amount of time. While incomplete frames
348*3f982cf4SFabien Sanglard // exist in the queue, send RTCP packets (with ACK/NACK feedback) frequently.
349*3f982cf4SFabien Sanglard // When there are no incomplete frames, use a longer "keepalive" interval.
350*3f982cf4SFabien Sanglard const Clock::duration interval =
351*3f982cf4SFabien Sanglard (no_nacks ? kRtcpReportInterval : kNackFeedbackInterval);
352*3f982cf4SFabien Sanglard rtcp_alarm_.Schedule([this] { SendRtcp(); }, last_rtcp_send_time_ + interval);
353*3f982cf4SFabien Sanglard }
354*3f982cf4SFabien Sanglard
GetQueueEntry(FrameId frame_id) const355*3f982cf4SFabien Sanglard const Receiver::PendingFrame& Receiver::GetQueueEntry(FrameId frame_id) const {
356*3f982cf4SFabien Sanglard return const_cast<Receiver*>(this)->GetQueueEntry(frame_id);
357*3f982cf4SFabien Sanglard }
358*3f982cf4SFabien Sanglard
GetQueueEntry(FrameId frame_id)359*3f982cf4SFabien Sanglard Receiver::PendingFrame& Receiver::GetQueueEntry(FrameId frame_id) {
360*3f982cf4SFabien Sanglard return pending_frames_[(frame_id - FrameId::first()) %
361*3f982cf4SFabien Sanglard pending_frames_.size()];
362*3f982cf4SFabien Sanglard }
363*3f982cf4SFabien Sanglard
RecordNewTargetPlayoutDelay(FrameId as_of_frame,milliseconds delay)364*3f982cf4SFabien Sanglard void Receiver::RecordNewTargetPlayoutDelay(FrameId as_of_frame,
365*3f982cf4SFabien Sanglard milliseconds delay) {
366*3f982cf4SFabien Sanglard OSP_DCHECK_GT(as_of_frame, checkpoint_frame());
367*3f982cf4SFabien Sanglard
368*3f982cf4SFabien Sanglard // Prune-out entries from |playout_delay_changes_| that are no longer needed.
369*3f982cf4SFabien Sanglard // At least one entry must always be kept (i.e., there must always be a
370*3f982cf4SFabien Sanglard // "current" setting).
371*3f982cf4SFabien Sanglard const FrameId next_frame = last_frame_consumed_ + 1;
372*3f982cf4SFabien Sanglard const auto keep_one_before_it = std::find_if(
373*3f982cf4SFabien Sanglard std::next(playout_delay_changes_.begin()), playout_delay_changes_.end(),
374*3f982cf4SFabien Sanglard [&](const auto& entry) { return entry.first > next_frame; });
375*3f982cf4SFabien Sanglard playout_delay_changes_.erase(playout_delay_changes_.begin(),
376*3f982cf4SFabien Sanglard std::prev(keep_one_before_it));
377*3f982cf4SFabien Sanglard
378*3f982cf4SFabien Sanglard // Insert the delay change entry, maintaining the ascending ordering of the
379*3f982cf4SFabien Sanglard // vector.
380*3f982cf4SFabien Sanglard const auto insert_it = std::find_if(
381*3f982cf4SFabien Sanglard playout_delay_changes_.begin(), playout_delay_changes_.end(),
382*3f982cf4SFabien Sanglard [&](const auto& entry) { return entry.first > as_of_frame; });
383*3f982cf4SFabien Sanglard playout_delay_changes_.emplace(insert_it, as_of_frame, delay);
384*3f982cf4SFabien Sanglard
385*3f982cf4SFabien Sanglard OSP_DCHECK(AreElementsSortedAndUnique(playout_delay_changes_));
386*3f982cf4SFabien Sanglard }
387*3f982cf4SFabien Sanglard
ResolveTargetPlayoutDelay(FrameId frame_id) const388*3f982cf4SFabien Sanglard milliseconds Receiver::ResolveTargetPlayoutDelay(FrameId frame_id) const {
389*3f982cf4SFabien Sanglard OSP_DCHECK_GT(frame_id, last_frame_consumed_);
390*3f982cf4SFabien Sanglard
391*3f982cf4SFabien Sanglard #if OSP_DCHECK_IS_ON()
392*3f982cf4SFabien Sanglard // Extra precaution: Ensure all possible playout delay changes are known. In
393*3f982cf4SFabien Sanglard // other words, every unconsumed frame in the queue, up to (and including)
394*3f982cf4SFabien Sanglard // |frame_id|, must have an assigned estimated_capture_time.
395*3f982cf4SFabien Sanglard for (FrameId f = last_frame_consumed_ + 1; f <= frame_id; ++f) {
396*3f982cf4SFabien Sanglard OSP_DCHECK(GetQueueEntry(f).estimated_capture_time)
397*3f982cf4SFabien Sanglard << " don't know whether there was a playout delay change for frame "
398*3f982cf4SFabien Sanglard << f;
399*3f982cf4SFabien Sanglard }
400*3f982cf4SFabien Sanglard #endif
401*3f982cf4SFabien Sanglard
402*3f982cf4SFabien Sanglard const auto it = std::find_if(
403*3f982cf4SFabien Sanglard playout_delay_changes_.crbegin(), playout_delay_changes_.crend(),
404*3f982cf4SFabien Sanglard [&](const auto& entry) { return entry.first <= frame_id; });
405*3f982cf4SFabien Sanglard OSP_DCHECK(it != playout_delay_changes_.crend());
406*3f982cf4SFabien Sanglard return it->second;
407*3f982cf4SFabien Sanglard }
408*3f982cf4SFabien Sanglard
AdvanceCheckpoint(FrameId new_checkpoint)409*3f982cf4SFabien Sanglard void Receiver::AdvanceCheckpoint(FrameId new_checkpoint) {
410*3f982cf4SFabien Sanglard TRACE_DEFAULT_SCOPED(TraceCategory::kReceiver);
411*3f982cf4SFabien Sanglard OSP_DCHECK_GT(new_checkpoint, checkpoint_frame());
412*3f982cf4SFabien Sanglard OSP_DCHECK_LE(new_checkpoint, latest_frame_expected_);
413*3f982cf4SFabien Sanglard
414*3f982cf4SFabien Sanglard while (new_checkpoint < latest_frame_expected_) {
415*3f982cf4SFabien Sanglard const FrameId next = new_checkpoint + 1;
416*3f982cf4SFabien Sanglard if (!GetQueueEntry(next).collector.is_complete()) {
417*3f982cf4SFabien Sanglard break;
418*3f982cf4SFabien Sanglard }
419*3f982cf4SFabien Sanglard new_checkpoint = next;
420*3f982cf4SFabien Sanglard }
421*3f982cf4SFabien Sanglard
422*3f982cf4SFabien Sanglard set_checkpoint_frame(new_checkpoint);
423*3f982cf4SFabien Sanglard rtcp_builder_.SetPlayoutDelay(ResolveTargetPlayoutDelay(new_checkpoint));
424*3f982cf4SFabien Sanglard SendRtcp();
425*3f982cf4SFabien Sanglard }
426*3f982cf4SFabien Sanglard
DropAllFramesBefore(FrameId first_kept_frame)427*3f982cf4SFabien Sanglard void Receiver::DropAllFramesBefore(FrameId first_kept_frame) {
428*3f982cf4SFabien Sanglard // The following DCHECKs are verifying that this method is only being called
429*3f982cf4SFabien Sanglard // because one or more incomplete frames are being skipped-over.
430*3f982cf4SFabien Sanglard const FrameId first_to_drop = last_frame_consumed_ + 1;
431*3f982cf4SFabien Sanglard OSP_DCHECK_GT(first_kept_frame, first_to_drop);
432*3f982cf4SFabien Sanglard OSP_DCHECK_GT(first_kept_frame, checkpoint_frame());
433*3f982cf4SFabien Sanglard OSP_DCHECK_LE(first_kept_frame, latest_frame_expected_);
434*3f982cf4SFabien Sanglard
435*3f982cf4SFabien Sanglard // Reset each of the frames being dropped, pretending that they were consumed.
436*3f982cf4SFabien Sanglard for (FrameId f = first_to_drop; f < first_kept_frame; ++f) {
437*3f982cf4SFabien Sanglard PendingFrame& entry = GetQueueEntry(f);
438*3f982cf4SFabien Sanglard // Pedantic sanity-check: Ensure the "target playout delay change" data
439*3f982cf4SFabien Sanglard // dependency was satisfied. See comments in AdvanceToNextFrame().
440*3f982cf4SFabien Sanglard OSP_DCHECK(entry.estimated_capture_time);
441*3f982cf4SFabien Sanglard entry.Reset();
442*3f982cf4SFabien Sanglard }
443*3f982cf4SFabien Sanglard last_frame_consumed_ = first_kept_frame - 1;
444*3f982cf4SFabien Sanglard
445*3f982cf4SFabien Sanglard RECEIVER_LOG(INFO) << "Artificially advancing checkpoint after skipping.";
446*3f982cf4SFabien Sanglard AdvanceCheckpoint(first_kept_frame);
447*3f982cf4SFabien Sanglard }
448*3f982cf4SFabien Sanglard
ScheduleFrameReadyCheck(Clock::time_point when)449*3f982cf4SFabien Sanglard void Receiver::ScheduleFrameReadyCheck(Clock::time_point when) {
450*3f982cf4SFabien Sanglard consumption_alarm_.Schedule(
451*3f982cf4SFabien Sanglard [this] {
452*3f982cf4SFabien Sanglard if (consumer_) {
453*3f982cf4SFabien Sanglard const int next_frame_buffer_size = AdvanceToNextFrame();
454*3f982cf4SFabien Sanglard if (next_frame_buffer_size != kNoFramesReady) {
455*3f982cf4SFabien Sanglard consumer_->OnFramesReady(next_frame_buffer_size);
456*3f982cf4SFabien Sanglard }
457*3f982cf4SFabien Sanglard }
458*3f982cf4SFabien Sanglard },
459*3f982cf4SFabien Sanglard when);
460*3f982cf4SFabien Sanglard }
461*3f982cf4SFabien Sanglard
462*3f982cf4SFabien Sanglard Receiver::PendingFrame::PendingFrame() = default;
463*3f982cf4SFabien Sanglard Receiver::PendingFrame::~PendingFrame() = default;
464*3f982cf4SFabien Sanglard
Reset()465*3f982cf4SFabien Sanglard void Receiver::PendingFrame::Reset() {
466*3f982cf4SFabien Sanglard collector.Reset();
467*3f982cf4SFabien Sanglard estimated_capture_time = absl::nullopt;
468*3f982cf4SFabien Sanglard }
469*3f982cf4SFabien Sanglard
470*3f982cf4SFabien Sanglard // static
471*3f982cf4SFabien Sanglard constexpr milliseconds Receiver::kDefaultPlayerProcessingTime;
472*3f982cf4SFabien Sanglard constexpr int Receiver::kNoFramesReady;
473*3f982cf4SFabien Sanglard constexpr milliseconds Receiver::kNackFeedbackInterval;
474*3f982cf4SFabien Sanglard
475*3f982cf4SFabien Sanglard } // namespace cast
476*3f982cf4SFabien Sanglard } // namespace openscreen
477