1 /*
2 * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "video/decode_synchronizer.h"
12
13 #include <iterator>
14 #include <memory>
15 #include <utility>
16 #include <vector>
17
18 #include "api/sequence_checker.h"
19 #include "api/units/time_delta.h"
20 #include "api/units/timestamp.h"
21 #include "rtc_base/checks.h"
22 #include "rtc_base/logging.h"
23 #include "video/frame_decode_scheduler.h"
24 #include "video/frame_decode_timing.h"
25
26 namespace webrtc {
27
ScheduledFrame(uint32_t rtp_timestamp,FrameDecodeTiming::FrameSchedule schedule,FrameDecodeScheduler::FrameReleaseCallback callback)28 DecodeSynchronizer::ScheduledFrame::ScheduledFrame(
29 uint32_t rtp_timestamp,
30 FrameDecodeTiming::FrameSchedule schedule,
31 FrameDecodeScheduler::FrameReleaseCallback callback)
32 : rtp_timestamp_(rtp_timestamp),
33 schedule_(std::move(schedule)),
34 callback_(std::move(callback)) {}
35
RunFrameReleaseCallback()36 void DecodeSynchronizer::ScheduledFrame::RunFrameReleaseCallback() && {
37 // Inspiration from Chromium base::OnceCallback. Move `*this` to a local
38 // before execution to ensure internal state is cleared after callback
39 // execution.
40 auto sf = std::move(*this);
41 std::move(sf.callback_)(sf.rtp_timestamp_, sf.schedule_.render_time);
42 }
43
LatestDecodeTime() const44 Timestamp DecodeSynchronizer::ScheduledFrame::LatestDecodeTime() const {
45 return schedule_.latest_decode_time;
46 }
47
48 DecodeSynchronizer::SynchronizedFrameDecodeScheduler::
SynchronizedFrameDecodeScheduler(DecodeSynchronizer * sync)49 SynchronizedFrameDecodeScheduler(DecodeSynchronizer* sync)
50 : sync_(sync) {
51 RTC_DCHECK(sync_);
52 }
53
54 DecodeSynchronizer::SynchronizedFrameDecodeScheduler::
~SynchronizedFrameDecodeScheduler()55 ~SynchronizedFrameDecodeScheduler() {
56 RTC_DCHECK(!next_frame_);
57 RTC_DCHECK(stopped_);
58 }
59
60 absl::optional<uint32_t>
ScheduledRtpTimestamp()61 DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ScheduledRtpTimestamp() {
62 return next_frame_.has_value()
63 ? absl::make_optional(next_frame_->rtp_timestamp())
64 : absl::nullopt;
65 }
66
67 DecodeSynchronizer::ScheduledFrame
ReleaseNextFrame()68 DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ReleaseNextFrame() {
69 RTC_DCHECK(next_frame_);
70 auto res = std::move(*next_frame_);
71 next_frame_.reset();
72 return res;
73 }
74
75 Timestamp
LatestDecodeTime()76 DecodeSynchronizer::SynchronizedFrameDecodeScheduler::LatestDecodeTime() {
77 RTC_DCHECK(next_frame_);
78 return next_frame_->LatestDecodeTime();
79 }
80
ScheduleFrame(uint32_t rtp,FrameDecodeTiming::FrameSchedule schedule,FrameReleaseCallback cb)81 void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ScheduleFrame(
82 uint32_t rtp,
83 FrameDecodeTiming::FrameSchedule schedule,
84 FrameReleaseCallback cb) {
85 RTC_DCHECK(!next_frame_) << "Can not schedule two frames at once.";
86 next_frame_ = ScheduledFrame(rtp, std::move(schedule), std::move(cb));
87 sync_->OnFrameScheduled(this);
88 }
89
CancelOutstanding()90 void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::CancelOutstanding() {
91 next_frame_.reset();
92 }
93
Stop()94 void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::Stop() {
95 CancelOutstanding();
96 stopped_ = true;
97 sync_->RemoveFrameScheduler(this);
98 }
99
DecodeSynchronizer(Clock * clock,Metronome * metronome,TaskQueueBase * worker_queue)100 DecodeSynchronizer::DecodeSynchronizer(Clock* clock,
101 Metronome* metronome,
102 TaskQueueBase* worker_queue)
103 : clock_(clock), worker_queue_(worker_queue), metronome_(metronome) {
104 RTC_DCHECK(metronome_);
105 RTC_DCHECK(worker_queue_);
106 }
107
~DecodeSynchronizer()108 DecodeSynchronizer::~DecodeSynchronizer() {
109 RTC_DCHECK_RUN_ON(worker_queue_);
110 RTC_DCHECK(schedulers_.empty());
111 }
112
113 std::unique_ptr<FrameDecodeScheduler>
CreateSynchronizedFrameScheduler()114 DecodeSynchronizer::CreateSynchronizedFrameScheduler() {
115 RTC_DCHECK_RUN_ON(worker_queue_);
116 auto scheduler = std::make_unique<SynchronizedFrameDecodeScheduler>(this);
117 auto [it, inserted] = schedulers_.emplace(scheduler.get());
118 // If this is the first `scheduler` added, start listening to the metronome.
119 if (inserted && schedulers_.size() == 1) {
120 RTC_DLOG(LS_VERBOSE) << "Listening to metronome";
121 ScheduleNextTick();
122 }
123
124 return std::move(scheduler);
125 }
126
OnFrameScheduled(SynchronizedFrameDecodeScheduler * scheduler)127 void DecodeSynchronizer::OnFrameScheduled(
128 SynchronizedFrameDecodeScheduler* scheduler) {
129 RTC_DCHECK_RUN_ON(worker_queue_);
130 RTC_DCHECK(scheduler->ScheduledRtpTimestamp());
131
132 Timestamp now = clock_->CurrentTime();
133 Timestamp next_tick = expected_next_tick_;
134 // If no tick has registered yet assume it will occur in the tick period.
135 if (next_tick.IsInfinite()) {
136 next_tick = now + metronome_->TickPeriod();
137 }
138
139 // Release the frame right away if the decode time is too soon. Otherwise
140 // the stream may fall behind too much.
141 bool decode_before_next_tick =
142 scheduler->LatestDecodeTime() <
143 (next_tick - FrameDecodeTiming::kMaxAllowedFrameDelay);
144 // Decode immediately if the decode time is in the past.
145 bool decode_time_in_past = scheduler->LatestDecodeTime() < now;
146
147 if (decode_before_next_tick || decode_time_in_past) {
148 ScheduledFrame scheduled_frame = scheduler->ReleaseNextFrame();
149 std::move(scheduled_frame).RunFrameReleaseCallback();
150 }
151 }
152
RemoveFrameScheduler(SynchronizedFrameDecodeScheduler * scheduler)153 void DecodeSynchronizer::RemoveFrameScheduler(
154 SynchronizedFrameDecodeScheduler* scheduler) {
155 RTC_DCHECK_RUN_ON(worker_queue_);
156 RTC_DCHECK(scheduler);
157 auto it = schedulers_.find(scheduler);
158 if (it == schedulers_.end()) {
159 return;
160 }
161 schedulers_.erase(it);
162 // If there are no more schedulers active, stop listening for metronome ticks.
163 if (schedulers_.empty()) {
164 expected_next_tick_ = Timestamp::PlusInfinity();
165 }
166 }
167
ScheduleNextTick()168 void DecodeSynchronizer::ScheduleNextTick() {
169 RTC_DCHECK_RUN_ON(worker_queue_);
170 metronome_->RequestCallOnNextTick(
171 SafeTask(safety_.flag(), [this] { OnTick(); }));
172 }
173
OnTick()174 void DecodeSynchronizer::OnTick() {
175 RTC_DCHECK_RUN_ON(worker_queue_);
176 expected_next_tick_ = clock_->CurrentTime() + metronome_->TickPeriod();
177
178 for (auto* scheduler : schedulers_) {
179 if (scheduler->ScheduledRtpTimestamp() &&
180 scheduler->LatestDecodeTime() < expected_next_tick_) {
181 auto scheduled_frame = scheduler->ReleaseNextFrame();
182 std::move(scheduled_frame).RunFrameReleaseCallback();
183 }
184 }
185
186 if (!schedulers_.empty())
187 ScheduleNextTick();
188 }
189
190 } // namespace webrtc
191