1 /*
2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.h"
12 
13 #include <math.h>
14 
15 #include <algorithm>
16 #include <memory>
17 #include <utility>
18 
19 #include "api/transport/field_trial_based_config.h"
20 #include "api/units/data_rate.h"
21 #include "api/units/data_size.h"
22 #include "api/units/time_delta.h"
23 #include "api/units/timestamp.h"
24 #include "modules/remote_bitrate_estimator/include/bwe_defines.h"
25 #include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
26 #include "rtc_base/checks.h"
27 #include "rtc_base/logging.h"
28 #include "rtc_base/thread_annotations.h"
29 #include "system_wrappers/include/metrics.h"
30 
31 namespace webrtc {
32 namespace {
33 
34 constexpr TimeDelta kMinClusterDelta = TimeDelta::Millis(1);
35 constexpr TimeDelta kInitialProbingInterval = TimeDelta::Seconds(2);
36 constexpr int kTimestampGroupLengthMs = 5;
37 constexpr int kAbsSendTimeInterArrivalUpshift = 8;
38 constexpr int kInterArrivalShift =
39     RTPHeaderExtension::kAbsSendTimeFraction + kAbsSendTimeInterArrivalUpshift;
40 constexpr int kMinClusterSize = 4;
41 constexpr int kMaxProbePackets = 15;
42 constexpr int kExpectedNumberOfProbes = 3;
43 constexpr double kTimestampToMs =
44     1000.0 / static_cast<double>(1 << kInterArrivalShift);
45 
OptionalRateFromOptionalBps(absl::optional<int> bitrate_bps)46 absl::optional<DataRate> OptionalRateFromOptionalBps(
47     absl::optional<int> bitrate_bps) {
48   if (bitrate_bps) {
49     return DataRate::BitsPerSec(*bitrate_bps);
50   } else {
51     return absl::nullopt;
52   }
53 }
54 
55 template <typename K, typename V>
Keys(const std::map<K,V> & map)56 std::vector<K> Keys(const std::map<K, V>& map) {
57   std::vector<K> keys;
58   keys.reserve(map.size());
59   for (const auto& kv_pair : map) {
60     keys.push_back(kv_pair.first);
61   }
62   return keys;
63 }
64 
65 }  // namespace
66 
67 RemoteBitrateEstimatorAbsSendTime::~RemoteBitrateEstimatorAbsSendTime() =
68     default;
69 
IsWithinClusterBounds(TimeDelta send_delta,const Cluster & cluster_aggregate)70 bool RemoteBitrateEstimatorAbsSendTime::IsWithinClusterBounds(
71     TimeDelta send_delta,
72     const Cluster& cluster_aggregate) {
73   if (cluster_aggregate.count == 0)
74     return true;
75   TimeDelta cluster_mean =
76       cluster_aggregate.send_mean / cluster_aggregate.count;
77   return (send_delta - cluster_mean).Abs() < TimeDelta::Micros(2'500);
78 }
79 
MaybeAddCluster(const Cluster & cluster_aggregate,std::list<Cluster> & clusters)80 void RemoteBitrateEstimatorAbsSendTime::MaybeAddCluster(
81     const Cluster& cluster_aggregate,
82     std::list<Cluster>& clusters) {
83   if (cluster_aggregate.count < kMinClusterSize ||
84       cluster_aggregate.send_mean <= TimeDelta::Zero() ||
85       cluster_aggregate.recv_mean <= TimeDelta::Zero()) {
86     return;
87   }
88 
89   Cluster cluster;
90   cluster.send_mean = cluster_aggregate.send_mean / cluster_aggregate.count;
91   cluster.recv_mean = cluster_aggregate.recv_mean / cluster_aggregate.count;
92   cluster.mean_size = cluster_aggregate.mean_size / cluster_aggregate.count;
93   cluster.count = cluster_aggregate.count;
94   cluster.num_above_min_delta = cluster_aggregate.num_above_min_delta;
95   clusters.push_back(cluster);
96 }
97 
RemoteBitrateEstimatorAbsSendTime(RemoteBitrateObserver * observer,Clock * clock)98 RemoteBitrateEstimatorAbsSendTime::RemoteBitrateEstimatorAbsSendTime(
99     RemoteBitrateObserver* observer,
100     Clock* clock)
101     : clock_(clock),
102       observer_(observer),
103       detector_(&field_trials_),
104       remote_rate_(&field_trials_) {
105   RTC_DCHECK(clock_);
106   RTC_DCHECK(observer_);
107   RTC_LOG(LS_INFO) << "RemoteBitrateEstimatorAbsSendTime: Instantiating.";
108 }
109 
110 std::list<RemoteBitrateEstimatorAbsSendTime::Cluster>
ComputeClusters() const111 RemoteBitrateEstimatorAbsSendTime::ComputeClusters() const {
112   std::list<Cluster> clusters;
113   Cluster cluster_aggregate;
114   Timestamp prev_send_time = Timestamp::MinusInfinity();
115   Timestamp prev_recv_time = Timestamp::MinusInfinity();
116   for (const Probe& probe : probes_) {
117     if (prev_send_time.IsFinite()) {
118       TimeDelta send_delta = probe.send_time - prev_send_time;
119       TimeDelta recv_delta = probe.recv_time - prev_recv_time;
120       if (send_delta >= kMinClusterDelta && recv_delta >= kMinClusterDelta) {
121         ++cluster_aggregate.num_above_min_delta;
122       }
123       if (!IsWithinClusterBounds(send_delta, cluster_aggregate)) {
124         MaybeAddCluster(cluster_aggregate, clusters);
125         cluster_aggregate = Cluster();
126       }
127       cluster_aggregate.send_mean += send_delta;
128       cluster_aggregate.recv_mean += recv_delta;
129       cluster_aggregate.mean_size += probe.payload_size;
130       ++cluster_aggregate.count;
131     }
132     prev_send_time = probe.send_time;
133     prev_recv_time = probe.recv_time;
134   }
135   MaybeAddCluster(cluster_aggregate, clusters);
136   return clusters;
137 }
138 
139 const RemoteBitrateEstimatorAbsSendTime::Cluster*
FindBestProbe(const std::list<Cluster> & clusters) const140 RemoteBitrateEstimatorAbsSendTime::FindBestProbe(
141     const std::list<Cluster>& clusters) const {
142   DataRate highest_probe_bitrate = DataRate::Zero();
143   const Cluster* best = nullptr;
144   for (const auto& cluster : clusters) {
145     if (cluster.send_mean == TimeDelta::Zero() ||
146         cluster.recv_mean == TimeDelta::Zero()) {
147       continue;
148     }
149     if (cluster.num_above_min_delta > cluster.count / 2 &&
150         (cluster.recv_mean - cluster.send_mean <= TimeDelta::Millis(2) &&
151          cluster.send_mean - cluster.recv_mean <= TimeDelta::Millis(5))) {
152       DataRate probe_bitrate =
153           std::min(cluster.SendBitrate(), cluster.RecvBitrate());
154       if (probe_bitrate > highest_probe_bitrate) {
155         highest_probe_bitrate = probe_bitrate;
156         best = &cluster;
157       }
158     } else {
159       RTC_LOG(LS_INFO) << "Probe failed, sent at "
160                        << cluster.SendBitrate().bps() << " bps, received at "
161                        << cluster.RecvBitrate().bps()
162                        << " bps. Mean send delta: " << cluster.send_mean.ms()
163                        << " ms, mean recv delta: " << cluster.recv_mean.ms()
164                        << " ms, num probes: " << cluster.count;
165       break;
166     }
167   }
168   return best;
169 }
170 
171 RemoteBitrateEstimatorAbsSendTime::ProbeResult
ProcessClusters(Timestamp now)172 RemoteBitrateEstimatorAbsSendTime::ProcessClusters(Timestamp now) {
173   std::list<Cluster> clusters = ComputeClusters();
174   if (clusters.empty()) {
175     // If we reach the max number of probe packets and still have no clusters,
176     // we will remove the oldest one.
177     if (probes_.size() >= kMaxProbePackets)
178       probes_.pop_front();
179     return ProbeResult::kNoUpdate;
180   }
181 
182   if (const Cluster* best = FindBestProbe(clusters)) {
183     DataRate probe_bitrate = std::min(best->SendBitrate(), best->RecvBitrate());
184     // Make sure that a probe sent on a lower bitrate than our estimate can't
185     // reduce the estimate.
186     if (IsBitrateImproving(probe_bitrate)) {
187       RTC_LOG(LS_INFO) << "Probe successful, sent at "
188                        << best->SendBitrate().bps() << " bps, received at "
189                        << best->RecvBitrate().bps()
190                        << " bps. Mean send delta: " << best->send_mean.ms()
191                        << " ms, mean recv delta: " << best->recv_mean.ms()
192                        << " ms, num probes: " << best->count;
193       remote_rate_.SetEstimate(probe_bitrate, now);
194       return ProbeResult::kBitrateUpdated;
195     }
196   }
197 
198   // Not probing and received non-probe packet, or finished with current set
199   // of probes.
200   if (clusters.size() >= kExpectedNumberOfProbes)
201     probes_.clear();
202   return ProbeResult::kNoUpdate;
203 }
204 
IsBitrateImproving(DataRate probe_bitrate) const205 bool RemoteBitrateEstimatorAbsSendTime::IsBitrateImproving(
206     DataRate probe_bitrate) const {
207   bool initial_probe =
208       !remote_rate_.ValidEstimate() && probe_bitrate > DataRate::Zero();
209   bool bitrate_above_estimate = remote_rate_.ValidEstimate() &&
210                                 probe_bitrate > remote_rate_.LatestEstimate();
211   return initial_probe || bitrate_above_estimate;
212 }
213 
IncomingPacket(int64_t arrival_time_ms,size_t payload_size,const RTPHeader & header)214 void RemoteBitrateEstimatorAbsSendTime::IncomingPacket(
215     int64_t arrival_time_ms,
216     size_t payload_size,
217     const RTPHeader& header) {
218   RTC_DCHECK_RUNS_SERIALIZED(&network_race_);
219   if (!header.extension.hasAbsoluteSendTime) {
220     RTC_LOG(LS_WARNING)
221         << "RemoteBitrateEstimatorAbsSendTimeImpl: Incoming packet "
222            "is missing absolute send time extension!";
223     return;
224   }
225   IncomingPacketInfo(Timestamp::Millis(arrival_time_ms),
226                      header.extension.absoluteSendTime,
227                      DataSize::Bytes(payload_size), header.ssrc);
228 }
229 
IncomingPacketInfo(Timestamp arrival_time,uint32_t send_time_24bits,DataSize payload_size,uint32_t ssrc)230 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketInfo(
231     Timestamp arrival_time,
232     uint32_t send_time_24bits,
233     DataSize payload_size,
234     uint32_t ssrc) {
235   RTC_CHECK(send_time_24bits < (1ul << 24));
236   if (!uma_recorded_) {
237     RTC_HISTOGRAM_ENUMERATION(kBweTypeHistogram, BweNames::kReceiverAbsSendTime,
238                               BweNames::kBweNamesMax);
239     uma_recorded_ = true;
240   }
241   // Shift up send time to use the full 32 bits that inter_arrival works with,
242   // so wrapping works properly.
243   uint32_t timestamp = send_time_24bits << kAbsSendTimeInterArrivalUpshift;
244   Timestamp send_time =
245       Timestamp::Millis(static_cast<int64_t>(timestamp) * kTimestampToMs);
246 
247   Timestamp now = clock_->CurrentTime();
248   // TODO(holmer): SSRCs are only needed for REMB, should be broken out from
249   // here.
250 
251   // Check if incoming bitrate estimate is valid, and if it needs to be reset.
252   absl::optional<uint32_t> incoming_bitrate =
253       incoming_bitrate_.Rate(arrival_time.ms());
254   if (incoming_bitrate) {
255     incoming_bitrate_initialized_ = true;
256   } else if (incoming_bitrate_initialized_) {
257     // Incoming bitrate had a previous valid value, but now not enough data
258     // point are left within the current window. Reset incoming bitrate
259     // estimator so that the window size will only contain new data points.
260     incoming_bitrate_.Reset();
261     incoming_bitrate_initialized_ = false;
262   }
263   incoming_bitrate_.Update(payload_size.bytes(), arrival_time.ms());
264 
265   if (first_packet_time_.IsInfinite()) {
266     first_packet_time_ = now;
267   }
268 
269   uint32_t ts_delta = 0;
270   int64_t t_delta = 0;
271   int size_delta = 0;
272   bool update_estimate = false;
273   DataRate target_bitrate = DataRate::Zero();
274   std::vector<uint32_t> ssrcs;
275   {
276     MutexLock lock(&mutex_);
277 
278     TimeoutStreams(now);
279     RTC_DCHECK(inter_arrival_);
280     RTC_DCHECK(estimator_);
281     ssrcs_.insert_or_assign(ssrc, now);
282 
283     // For now only try to detect probes while we don't have a valid estimate.
284     // We currently assume that only packets larger than 200 bytes are paced by
285     // the sender.
286     static constexpr DataSize kMinProbePacketSize = DataSize::Bytes(200);
287     if (payload_size > kMinProbePacketSize &&
288         (!remote_rate_.ValidEstimate() ||
289          now - first_packet_time_ < kInitialProbingInterval)) {
290       // TODO(holmer): Use a map instead to get correct order?
291       if (total_probes_received_ < kMaxProbePackets) {
292         TimeDelta send_delta = TimeDelta::Millis(-1);
293         TimeDelta recv_delta = TimeDelta::Millis(-1);
294         if (!probes_.empty()) {
295           send_delta = send_time - probes_.back().send_time;
296           recv_delta = arrival_time - probes_.back().recv_time;
297         }
298         RTC_LOG(LS_INFO) << "Probe packet received: send time="
299                          << send_time.ms()
300                          << " ms, recv time=" << arrival_time.ms()
301                          << " ms, send delta=" << send_delta.ms()
302                          << " ms, recv delta=" << recv_delta.ms() << " ms.";
303       }
304       probes_.emplace_back(send_time, arrival_time, payload_size);
305       ++total_probes_received_;
306       // Make sure that a probe which updated the bitrate immediately has an
307       // effect by calling the OnReceiveBitrateChanged callback.
308       if (ProcessClusters(now) == ProbeResult::kBitrateUpdated)
309         update_estimate = true;
310     }
311     if (inter_arrival_->ComputeDeltas(timestamp, arrival_time.ms(), now.ms(),
312                                       payload_size.bytes(), &ts_delta, &t_delta,
313                                       &size_delta)) {
314       double ts_delta_ms = (1000.0 * ts_delta) / (1 << kInterArrivalShift);
315       estimator_->Update(t_delta, ts_delta_ms, size_delta, detector_.State(),
316                          arrival_time.ms());
317       detector_.Detect(estimator_->offset(), ts_delta_ms,
318                        estimator_->num_of_deltas(), arrival_time.ms());
319     }
320 
321     if (!update_estimate) {
322       // Check if it's time for a periodic update or if we should update because
323       // of an over-use.
324       if (last_update_.IsInfinite() ||
325           now.ms() - last_update_.ms() >
326               remote_rate_.GetFeedbackInterval().ms()) {
327         update_estimate = true;
328       } else if (detector_.State() == BandwidthUsage::kBwOverusing) {
329         absl::optional<uint32_t> incoming_rate =
330             incoming_bitrate_.Rate(arrival_time.ms());
331         if (incoming_rate && remote_rate_.TimeToReduceFurther(
332                                  now, DataRate::BitsPerSec(*incoming_rate))) {
333           update_estimate = true;
334         }
335       }
336     }
337 
338     if (update_estimate) {
339       // The first overuse should immediately trigger a new estimate.
340       // We also have to update the estimate immediately if we are overusing
341       // and the target bitrate is too high compared to what we are receiving.
342       const RateControlInput input(
343           detector_.State(), OptionalRateFromOptionalBps(
344                                  incoming_bitrate_.Rate(arrival_time.ms())));
345       target_bitrate = remote_rate_.Update(&input, now);
346       update_estimate = remote_rate_.ValidEstimate();
347       ssrcs = Keys(ssrcs_);
348     }
349   }
350   if (update_estimate) {
351     last_update_ = now;
352     observer_->OnReceiveBitrateChanged(ssrcs, target_bitrate.bps<uint32_t>());
353   }
354 }
355 
Process()356 TimeDelta RemoteBitrateEstimatorAbsSendTime::Process() {
357   return TimeDelta::PlusInfinity();
358 }
359 
TimeoutStreams(Timestamp now)360 void RemoteBitrateEstimatorAbsSendTime::TimeoutStreams(Timestamp now) {
361   for (auto it = ssrcs_.begin(); it != ssrcs_.end();) {
362     if (now - it->second > TimeDelta::Millis(kStreamTimeOutMs)) {
363       ssrcs_.erase(it++);
364     } else {
365       ++it;
366     }
367   }
368   if (ssrcs_.empty()) {
369     // We can't update the estimate if we don't have any active streams.
370     inter_arrival_ = std::make_unique<InterArrival>(
371         (kTimestampGroupLengthMs << kInterArrivalShift) / 1000, kTimestampToMs,
372         true);
373     estimator_ = std::make_unique<OveruseEstimator>(OverUseDetectorOptions());
374     // We deliberately don't reset the first_packet_time_ms_ here for now since
375     // we only probe for bandwidth in the beginning of a call right now.
376   }
377 }
378 
OnRttUpdate(int64_t avg_rtt_ms,int64_t)379 void RemoteBitrateEstimatorAbsSendTime::OnRttUpdate(int64_t avg_rtt_ms,
380                                                     int64_t /*max_rtt_ms*/) {
381   MutexLock lock(&mutex_);
382   remote_rate_.SetRtt(TimeDelta::Millis(avg_rtt_ms));
383 }
384 
RemoveStream(uint32_t ssrc)385 void RemoteBitrateEstimatorAbsSendTime::RemoveStream(uint32_t ssrc) {
386   MutexLock lock(&mutex_);
387   ssrcs_.erase(ssrc);
388 }
389 
LatestEstimate() const390 DataRate RemoteBitrateEstimatorAbsSendTime::LatestEstimate() const {
391   // Currently accessed only from the worker thread (see Call::GetStats()).
392   MutexLock lock(&mutex_);
393   if (!remote_rate_.ValidEstimate() || ssrcs_.empty()) {
394     return DataRate::Zero();
395   }
396   return remote_rate_.LatestEstimate();
397 }
398 
399 }  // namespace webrtc
400