1 /*
2 * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/video_coding/nack_requester.h"
12
13 #include <algorithm>
14 #include <limits>
15
16 #include "api/sequence_checker.h"
17 #include "api/task_queue/task_queue_base.h"
18 #include "api/units/timestamp.h"
19 #include "rtc_base/checks.h"
20 #include "rtc_base/experiments/field_trial_parser.h"
21 #include "rtc_base/logging.h"
22
23 namespace webrtc {
24
25 namespace {
26 constexpr int kMaxPacketAge = 10'000;
27 constexpr int kMaxNackPackets = 1000;
28 constexpr TimeDelta kDefaultRtt = TimeDelta::Millis(100);
29 constexpr int kMaxNackRetries = 10;
30 constexpr int kMaxReorderedPackets = 128;
31 constexpr int kNumReorderingBuckets = 10;
32 constexpr TimeDelta kDefaultSendNackDelay = TimeDelta::Zero();
33
GetSendNackDelay(const FieldTrialsView & field_trials)34 TimeDelta GetSendNackDelay(const FieldTrialsView& field_trials) {
35 int64_t delay_ms = strtol(
36 field_trials.Lookup("WebRTC-SendNackDelayMs").c_str(), nullptr, 10);
37 if (delay_ms > 0 && delay_ms <= 20) {
38 RTC_LOG(LS_INFO) << "SendNackDelay is set to " << delay_ms;
39 return TimeDelta::Millis(delay_ms);
40 }
41 return kDefaultSendNackDelay;
42 }
43 } // namespace
44
45 constexpr TimeDelta NackPeriodicProcessor::kUpdateInterval;
46
NackPeriodicProcessor(TimeDelta update_interval)47 NackPeriodicProcessor::NackPeriodicProcessor(TimeDelta update_interval)
48 : update_interval_(update_interval) {}
49
~NackPeriodicProcessor()50 NackPeriodicProcessor::~NackPeriodicProcessor() {}
51
RegisterNackModule(NackRequesterBase * module)52 void NackPeriodicProcessor::RegisterNackModule(NackRequesterBase* module) {
53 RTC_DCHECK_RUN_ON(&sequence_);
54 modules_.push_back(module);
55 if (modules_.size() != 1)
56 return;
57 repeating_task_ = RepeatingTaskHandle::DelayedStart(
58 TaskQueueBase::Current(), update_interval_, [this] {
59 RTC_DCHECK_RUN_ON(&sequence_);
60 ProcessNackModules();
61 return update_interval_;
62 });
63 }
64
UnregisterNackModule(NackRequesterBase * module)65 void NackPeriodicProcessor::UnregisterNackModule(NackRequesterBase* module) {
66 RTC_DCHECK_RUN_ON(&sequence_);
67 auto it = std::find(modules_.begin(), modules_.end(), module);
68 RTC_DCHECK(it != modules_.end());
69 modules_.erase(it);
70 if (modules_.empty())
71 repeating_task_.Stop();
72 }
73
ProcessNackModules()74 void NackPeriodicProcessor::ProcessNackModules() {
75 RTC_DCHECK_RUN_ON(&sequence_);
76 for (NackRequesterBase* module : modules_)
77 module->ProcessNacks();
78 }
79
80 ScopedNackPeriodicProcessorRegistration::
ScopedNackPeriodicProcessorRegistration(NackRequesterBase * module,NackPeriodicProcessor * processor)81 ScopedNackPeriodicProcessorRegistration(NackRequesterBase* module,
82 NackPeriodicProcessor* processor)
83 : module_(module), processor_(processor) {
84 processor_->RegisterNackModule(module_);
85 }
86
87 ScopedNackPeriodicProcessorRegistration::
~ScopedNackPeriodicProcessorRegistration()88 ~ScopedNackPeriodicProcessorRegistration() {
89 processor_->UnregisterNackModule(module_);
90 }
91
NackInfo()92 NackRequester::NackInfo::NackInfo()
93 : seq_num(0),
94 send_at_seq_num(0),
95 created_at_time(Timestamp::MinusInfinity()),
96 sent_at_time(Timestamp::MinusInfinity()),
97 retries(0) {}
98
NackInfo(uint16_t seq_num,uint16_t send_at_seq_num,Timestamp created_at_time)99 NackRequester::NackInfo::NackInfo(uint16_t seq_num,
100 uint16_t send_at_seq_num,
101 Timestamp created_at_time)
102 : seq_num(seq_num),
103 send_at_seq_num(send_at_seq_num),
104 created_at_time(created_at_time),
105 sent_at_time(Timestamp::MinusInfinity()),
106 retries(0) {}
107
NackRequester(TaskQueueBase * current_queue,NackPeriodicProcessor * periodic_processor,Clock * clock,NackSender * nack_sender,KeyFrameRequestSender * keyframe_request_sender,const FieldTrialsView & field_trials)108 NackRequester::NackRequester(TaskQueueBase* current_queue,
109 NackPeriodicProcessor* periodic_processor,
110 Clock* clock,
111 NackSender* nack_sender,
112 KeyFrameRequestSender* keyframe_request_sender,
113 const FieldTrialsView& field_trials)
114 : worker_thread_(current_queue),
115 clock_(clock),
116 nack_sender_(nack_sender),
117 keyframe_request_sender_(keyframe_request_sender),
118 reordering_histogram_(kNumReorderingBuckets, kMaxReorderedPackets),
119 initialized_(false),
120 rtt_(kDefaultRtt),
121 newest_seq_num_(0),
122 send_nack_delay_(GetSendNackDelay(field_trials)),
123 processor_registration_(this, periodic_processor) {
124 RTC_DCHECK(clock_);
125 RTC_DCHECK(nack_sender_);
126 RTC_DCHECK(keyframe_request_sender_);
127 RTC_DCHECK(worker_thread_);
128 RTC_DCHECK(worker_thread_->IsCurrent());
129 }
130
~NackRequester()131 NackRequester::~NackRequester() {
132 RTC_DCHECK_RUN_ON(worker_thread_);
133 }
134
ProcessNacks()135 void NackRequester::ProcessNacks() {
136 RTC_DCHECK_RUN_ON(worker_thread_);
137 std::vector<uint16_t> nack_batch = GetNackBatch(kTimeOnly);
138 if (!nack_batch.empty()) {
139 // This batch of NACKs is triggered externally; there is no external
140 // initiator who can batch them with other feedback messages.
141 nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/false);
142 }
143 }
144
OnReceivedPacket(uint16_t seq_num,bool is_keyframe)145 int NackRequester::OnReceivedPacket(uint16_t seq_num, bool is_keyframe) {
146 RTC_DCHECK_RUN_ON(worker_thread_);
147 return OnReceivedPacket(seq_num, is_keyframe, false);
148 }
149
OnReceivedPacket(uint16_t seq_num,bool is_keyframe,bool is_recovered)150 int NackRequester::OnReceivedPacket(uint16_t seq_num,
151 bool is_keyframe,
152 bool is_recovered) {
153 RTC_DCHECK_RUN_ON(worker_thread_);
154 // TODO(philipel): When the packet includes information whether it is
155 // retransmitted or not, use that value instead. For
156 // now set it to true, which will cause the reordering
157 // statistics to never be updated.
158 bool is_retransmitted = true;
159
160 if (!initialized_) {
161 newest_seq_num_ = seq_num;
162 if (is_keyframe)
163 keyframe_list_.insert(seq_num);
164 initialized_ = true;
165 return 0;
166 }
167
168 // Since the `newest_seq_num_` is a packet we have actually received we know
169 // that packet has never been Nacked.
170 if (seq_num == newest_seq_num_)
171 return 0;
172
173 if (AheadOf(newest_seq_num_, seq_num)) {
174 // An out of order packet has been received.
175 auto nack_list_it = nack_list_.find(seq_num);
176 int nacks_sent_for_packet = 0;
177 if (nack_list_it != nack_list_.end()) {
178 nacks_sent_for_packet = nack_list_it->second.retries;
179 nack_list_.erase(nack_list_it);
180 }
181 if (!is_retransmitted)
182 UpdateReorderingStatistics(seq_num);
183 return nacks_sent_for_packet;
184 }
185
186 // Keep track of new keyframes.
187 if (is_keyframe)
188 keyframe_list_.insert(seq_num);
189
190 // And remove old ones so we don't accumulate keyframes.
191 auto it = keyframe_list_.lower_bound(seq_num - kMaxPacketAge);
192 if (it != keyframe_list_.begin())
193 keyframe_list_.erase(keyframe_list_.begin(), it);
194
195 if (is_recovered) {
196 recovered_list_.insert(seq_num);
197
198 // Remove old ones so we don't accumulate recovered packets.
199 auto it = recovered_list_.lower_bound(seq_num - kMaxPacketAge);
200 if (it != recovered_list_.begin())
201 recovered_list_.erase(recovered_list_.begin(), it);
202
203 // Do not send nack for packets recovered by FEC or RTX.
204 return 0;
205 }
206
207 AddPacketsToNack(newest_seq_num_ + 1, seq_num);
208 newest_seq_num_ = seq_num;
209
210 // Are there any nacks that are waiting for this seq_num.
211 std::vector<uint16_t> nack_batch = GetNackBatch(kSeqNumOnly);
212 if (!nack_batch.empty()) {
213 // This batch of NACKs is triggered externally; the initiator can
214 // batch them with other feedback messages.
215 nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/true);
216 }
217
218 return 0;
219 }
220
ClearUpTo(uint16_t seq_num)221 void NackRequester::ClearUpTo(uint16_t seq_num) {
222 // Called via RtpVideoStreamReceiver2::FrameContinuous on the network thread.
223 worker_thread_->PostTask(SafeTask(task_safety_.flag(), [seq_num, this]() {
224 RTC_DCHECK_RUN_ON(worker_thread_);
225 nack_list_.erase(nack_list_.begin(), nack_list_.lower_bound(seq_num));
226 keyframe_list_.erase(keyframe_list_.begin(),
227 keyframe_list_.lower_bound(seq_num));
228 recovered_list_.erase(recovered_list_.begin(),
229 recovered_list_.lower_bound(seq_num));
230 }));
231 }
232
UpdateRtt(int64_t rtt_ms)233 void NackRequester::UpdateRtt(int64_t rtt_ms) {
234 RTC_DCHECK_RUN_ON(worker_thread_);
235 rtt_ = TimeDelta::Millis(rtt_ms);
236 }
237
RemovePacketsUntilKeyFrame()238 bool NackRequester::RemovePacketsUntilKeyFrame() {
239 // Called on worker_thread_.
240 while (!keyframe_list_.empty()) {
241 auto it = nack_list_.lower_bound(*keyframe_list_.begin());
242
243 if (it != nack_list_.begin()) {
244 // We have found a keyframe that actually is newer than at least one
245 // packet in the nack list.
246 nack_list_.erase(nack_list_.begin(), it);
247 return true;
248 }
249
250 // If this keyframe is so old it does not remove any packets from the list,
251 // remove it from the list of keyframes and try the next keyframe.
252 keyframe_list_.erase(keyframe_list_.begin());
253 }
254 return false;
255 }
256
AddPacketsToNack(uint16_t seq_num_start,uint16_t seq_num_end)257 void NackRequester::AddPacketsToNack(uint16_t seq_num_start,
258 uint16_t seq_num_end) {
259 // Called on worker_thread_.
260 // Remove old packets.
261 auto it = nack_list_.lower_bound(seq_num_end - kMaxPacketAge);
262 nack_list_.erase(nack_list_.begin(), it);
263
264 // If the nack list is too large, remove packets from the nack list until
265 // the latest first packet of a keyframe. If the list is still too large,
266 // clear it and request a keyframe.
267 uint16_t num_new_nacks = ForwardDiff(seq_num_start, seq_num_end);
268 if (nack_list_.size() + num_new_nacks > kMaxNackPackets) {
269 while (RemovePacketsUntilKeyFrame() &&
270 nack_list_.size() + num_new_nacks > kMaxNackPackets) {
271 }
272
273 if (nack_list_.size() + num_new_nacks > kMaxNackPackets) {
274 nack_list_.clear();
275 RTC_LOG(LS_WARNING) << "NACK list full, clearing NACK"
276 " list and requesting keyframe.";
277 keyframe_request_sender_->RequestKeyFrame();
278 return;
279 }
280 }
281
282 for (uint16_t seq_num = seq_num_start; seq_num != seq_num_end; ++seq_num) {
283 // Do not send nack for packets that are already recovered by FEC or RTX
284 if (recovered_list_.find(seq_num) != recovered_list_.end())
285 continue;
286 NackInfo nack_info(seq_num, seq_num + WaitNumberOfPackets(0.5),
287 clock_->CurrentTime());
288 RTC_DCHECK(nack_list_.find(seq_num) == nack_list_.end());
289 nack_list_[seq_num] = nack_info;
290 }
291 }
292
GetNackBatch(NackFilterOptions options)293 std::vector<uint16_t> NackRequester::GetNackBatch(NackFilterOptions options) {
294 // Called on worker_thread_.
295
296 bool consider_seq_num = options != kTimeOnly;
297 bool consider_timestamp = options != kSeqNumOnly;
298 Timestamp now = clock_->CurrentTime();
299 std::vector<uint16_t> nack_batch;
300 auto it = nack_list_.begin();
301 while (it != nack_list_.end()) {
302 bool delay_timed_out = now - it->second.created_at_time >= send_nack_delay_;
303 bool nack_on_rtt_passed = now - it->second.sent_at_time >= rtt_;
304 bool nack_on_seq_num_passed =
305 it->second.sent_at_time.IsInfinite() &&
306 AheadOrAt(newest_seq_num_, it->second.send_at_seq_num);
307 if (delay_timed_out && ((consider_seq_num && nack_on_seq_num_passed) ||
308 (consider_timestamp && nack_on_rtt_passed))) {
309 nack_batch.emplace_back(it->second.seq_num);
310 ++it->second.retries;
311 it->second.sent_at_time = now;
312 if (it->second.retries >= kMaxNackRetries) {
313 RTC_LOG(LS_WARNING) << "Sequence number " << it->second.seq_num
314 << " removed from NACK list due to max retries.";
315 it = nack_list_.erase(it);
316 } else {
317 ++it;
318 }
319 continue;
320 }
321 ++it;
322 }
323 return nack_batch;
324 }
325
UpdateReorderingStatistics(uint16_t seq_num)326 void NackRequester::UpdateReorderingStatistics(uint16_t seq_num) {
327 // Running on worker_thread_.
328 RTC_DCHECK(AheadOf(newest_seq_num_, seq_num));
329 uint16_t diff = ReverseDiff(newest_seq_num_, seq_num);
330 reordering_histogram_.Add(diff);
331 }
332
WaitNumberOfPackets(float probability) const333 int NackRequester::WaitNumberOfPackets(float probability) const {
334 // Called on worker_thread_;
335 if (reordering_histogram_.NumValues() == 0)
336 return 0;
337 return reordering_histogram_.InverseCdf(probability);
338 }
339
340 } // namespace webrtc
341