xref: /aosp_15_r20/external/cronet/net/nqe/throughput_analyzer.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/nqe/throughput_analyzer.h"
6 
7 #include <cmath>
8 
9 #include "base/functional/bind.h"
10 #include "base/location.h"
11 #include "base/metrics/histogram_macros.h"
12 #include "base/numerics/safe_conversions.h"
13 #include "base/task/single_thread_task_runner.h"
14 #include "base/time/tick_clock.h"
15 #include "net/base/host_port_pair.h"
16 #include "net/base/network_activity_monitor.h"
17 #include "net/base/url_util.h"
18 #include "net/nqe/network_quality_estimator.h"
19 #include "net/nqe/network_quality_estimator_params.h"
20 #include "net/nqe/network_quality_estimator_util.h"
21 #include "net/url_request/url_request.h"
22 #include "net/url_request/url_request_context.h"
23 
24 namespace net {
25 
26 class HostResolver;
27 
28 namespace {
29 
30 // Maximum number of accuracy degrading requests, and requests that do not
31 // degrade accuracy held in the memory.
32 static const size_t kMaxRequestsSize = 300;
33 
34 // Returns true if the request should be discarded because it does not provide
35 // meaningful observation.
ShouldDiscardRequest(const URLRequest & request)36 bool ShouldDiscardRequest(const URLRequest& request) {
37   return request.method() != "GET";
38 }
39 
40 }  // namespace
41 
42 namespace nqe::internal {
43 // The default content size of a HTML response body. It is set to the median
44 // HTML response content size, i.e. 1.8kB.
45 constexpr int64_t kDefaultContentSizeBytes = 1800;
46 
ThroughputAnalyzer(const NetworkQualityEstimator * network_quality_estimator,const NetworkQualityEstimatorParams * params,scoped_refptr<base::SingleThreadTaskRunner> task_runner,ThroughputObservationCallback throughput_observation_callback,const base::TickClock * tick_clock,const NetLogWithSource & net_log)47 ThroughputAnalyzer::ThroughputAnalyzer(
48     const NetworkQualityEstimator* network_quality_estimator,
49     const NetworkQualityEstimatorParams* params,
50     scoped_refptr<base::SingleThreadTaskRunner> task_runner,
51     ThroughputObservationCallback throughput_observation_callback,
52     const base::TickClock* tick_clock,
53     const NetLogWithSource& net_log)
54     : network_quality_estimator_(network_quality_estimator),
55       params_(params),
56       task_runner_(task_runner),
57       throughput_observation_callback_(throughput_observation_callback),
58       tick_clock_(tick_clock),
59       last_connection_change_(tick_clock_->NowTicks()),
60       window_start_time_(base::TimeTicks()),
61       net_log_(net_log) {
62   DCHECK(tick_clock_);
63   DCHECK(network_quality_estimator_);
64   DCHECK(params_);
65   DCHECK(task_runner_);
66   DCHECK(!IsCurrentlyTrackingThroughput());
67 }
68 
~ThroughputAnalyzer()69 ThroughputAnalyzer::~ThroughputAnalyzer() {
70   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
71 }
72 
MaybeStartThroughputObservationWindow()73 void ThroughputAnalyzer::MaybeStartThroughputObservationWindow() {
74   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
75 
76   if (disable_throughput_measurements_)
77     return;
78 
79   // Throughput observation window can be started only if no accuracy degrading
80   // requests are currently active, the observation window is not already
81   // started, and there is at least one active request that does not degrade
82   // throughput computation accuracy.
83   if (accuracy_degrading_requests_.size() > 0 ||
84       IsCurrentlyTrackingThroughput() ||
85       requests_.size() < params_->throughput_min_requests_in_flight()) {
86     return;
87   }
88   window_start_time_ = tick_clock_->NowTicks();
89   bits_received_at_window_start_ = GetBitsReceived();
90 }
91 
EndThroughputObservationWindow()92 void ThroughputAnalyzer::EndThroughputObservationWindow() {
93   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
94 
95   // Mark the throughput observation window as stopped by resetting the window
96   // parameters.
97   window_start_time_ = base::TimeTicks();
98   bits_received_at_window_start_ = 0;
99   DCHECK(!IsCurrentlyTrackingThroughput());
100 }
101 
IsCurrentlyTrackingThroughput() const102 bool ThroughputAnalyzer::IsCurrentlyTrackingThroughput() const {
103   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
104 
105   if (window_start_time_.is_null())
106     return false;
107 
108   // If the throughput observation window is running, then at least one request
109   // that does not degrade throughput computation accuracy should be active.
110   DCHECK_GT(requests_.size(), 0U);
111 
112   // If the throughput observation window is running, then no accuracy degrading
113   // requests should be currently active.
114   DCHECK_EQ(0U, accuracy_degrading_requests_.size());
115 
116   DCHECK_LE(params_->throughput_min_requests_in_flight(), requests_.size());
117 
118   return true;
119 }
120 
SetTickClockForTesting(const base::TickClock * tick_clock)121 void ThroughputAnalyzer::SetTickClockForTesting(
122     const base::TickClock* tick_clock) {
123   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
124   tick_clock_ = tick_clock;
125   DCHECK(tick_clock_);
126 }
127 
UpdateResponseContentSize(const URLRequest * request,int64_t response_size)128 void ThroughputAnalyzer::UpdateResponseContentSize(const URLRequest* request,
129                                                    int64_t response_size) {
130   DCHECK_LE(0, response_size);
131   // Updates the map and the counter. Subtracts the previous stored response
132   // content size if an old record exists in the map.
133   if (response_content_sizes_.find(request) != response_content_sizes_.end()) {
134     total_response_content_size_ +=
135         response_size - response_content_sizes_[request];
136   } else {
137     total_response_content_size_ += response_size;
138   }
139   response_content_sizes_[request] = response_size;
140 }
141 
NotifyStartTransaction(const URLRequest & request)142 void ThroughputAnalyzer::NotifyStartTransaction(const URLRequest& request) {
143   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
144 
145   UpdateResponseContentSize(&request, kDefaultContentSizeBytes);
146 
147   if (disable_throughput_measurements_)
148     return;
149 
150   const bool degrades_accuracy = DegradesAccuracy(request);
151   if (degrades_accuracy) {
152     accuracy_degrading_requests_.insert(&request);
153 
154     BoundRequestsSize();
155 
156     // Call EndThroughputObservationWindow since observations cannot be
157     // recorded in the presence of requests that degrade throughput computation
158     // accuracy.
159     EndThroughputObservationWindow();
160     DCHECK(!IsCurrentlyTrackingThroughput());
161     return;
162   } else if (ShouldDiscardRequest(request)) {
163     return;
164   }
165 
166   EraseHangingRequests(request);
167 
168   requests_[&request] = tick_clock_->NowTicks();
169   BoundRequestsSize();
170   MaybeStartThroughputObservationWindow();
171 }
172 
NotifyBytesRead(const URLRequest & request)173 void ThroughputAnalyzer::NotifyBytesRead(const URLRequest& request) {
174   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
175 
176   if (disable_throughput_measurements_)
177     return;
178 
179   EraseHangingRequests(request);
180 
181   if (requests_.erase(&request) == 0)
182     return;
183 
184   // Update the time when the bytes were received for |request|.
185   requests_[&request] = tick_clock_->NowTicks();
186 }
187 
NotifyRequestCompleted(const URLRequest & request)188 void ThroughputAnalyzer::NotifyRequestCompleted(const URLRequest& request) {
189   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
190 
191   // Remove the request from the inflight requests if it presents in the map.
192   if (response_content_sizes_.find(&request) != response_content_sizes_.end()) {
193     total_response_content_size_ -= response_content_sizes_[&request];
194     response_content_sizes_.erase(&request);
195   }
196 
197   if (disable_throughput_measurements_)
198     return;
199 
200   // Return early if the |request| is not present in the collections of
201   // requests. This may happen when a completed request is later destroyed.
202   if (requests_.find(&request) == requests_.end() &&
203       accuracy_degrading_requests_.find(&request) ==
204           accuracy_degrading_requests_.end()) {
205     return;
206   }
207 
208   EraseHangingRequests(request);
209 
210   int32_t downstream_kbps = -1;
211   if (MaybeGetThroughputObservation(&downstream_kbps)) {
212     // Notify the provided callback.
213     task_runner_->PostTask(
214         FROM_HERE,
215         base::BindOnce(throughput_observation_callback_, downstream_kbps));
216   }
217 
218   // Try to remove the request from either |accuracy_degrading_requests_| or
219   // |requests_|, since it is no longer active.
220   if (accuracy_degrading_requests_.erase(&request) == 1u) {
221     // Generally, |request| cannot be in both |accuracy_degrading_requests_|
222     // and |requests_| at the same time. However, in some cases, the same
223     // request may appear in both vectors. See https://crbug.com/849604 for
224     // more details.
225     // It's safe to delete |request| from |requests_| since (i)
226     // The observation window is currently not recording throughput, and (ii)
227     // |requests_| is a best effort guess of requests that are currently
228     // in-flight.
229     DCHECK(!IsCurrentlyTrackingThroughput());
230     requests_.erase(&request);
231 
232     // If a request that degraded the accuracy of throughput computation has
233     // completed, then it may be possible to start the tracking window.
234     MaybeStartThroughputObservationWindow();
235     return;
236   }
237 
238   if (requests_.erase(&request) == 1u) {
239     // If there is no network activity, stop tracking throughput to prevent
240     // recording of any observations.
241     if (requests_.size() < params_->throughput_min_requests_in_flight())
242       EndThroughputObservationWindow();
243     return;
244   }
245   MaybeStartThroughputObservationWindow();
246 }
247 
NotifyExpectedResponseContentSize(const URLRequest & request,int64_t expected_content_size)248 void ThroughputAnalyzer::NotifyExpectedResponseContentSize(
249     const URLRequest& request,
250     int64_t expected_content_size) {
251   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
252   // Updates when the value is valid.
253   if (expected_content_size >= 0) {
254     UpdateResponseContentSize(&request, expected_content_size);
255   }
256 }
257 
IsHangingWindow(int64_t bits_received,base::TimeDelta duration) const258 bool ThroughputAnalyzer::IsHangingWindow(int64_t bits_received,
259                                          base::TimeDelta duration) const {
260   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
261 
262   if (params_->throughput_hanging_requests_cwnd_size_multiplier() <= 0)
263     return false;
264 
265   if (params_->use_small_responses())
266     return false;
267 
268   if (!duration.is_positive())
269     return false;
270 
271   // Initial congestion window size for TCP connections.
272   static constexpr size_t kCwndSizeKilobytes = 10 * 1.5;
273   static constexpr size_t kCwndSizeBits = kCwndSizeKilobytes * 1000 * 8;
274 
275   // Scale the |duration| to one HTTP RTT, and compute the number of bits that
276   // would be received over a duration of one HTTP RTT.
277   size_t bits_received_over_one_http_rtt =
278       bits_received *
279       (network_quality_estimator_->GetHttpRTT().value_or(base::Seconds(10)) /
280        duration);
281 
282   // If |is_hanging| is true, it implies that less than
283   // kCwndSizeKilobytes were received over a period of 1 HTTP RTT. For a network
284   // that is not under-utilized, it is expected that at least |kCwndSizeBits|
285   // are received over a duration of 1 HTTP RTT.
286   bool is_hanging =
287       bits_received_over_one_http_rtt <
288       (kCwndSizeBits *
289        params_->throughput_hanging_requests_cwnd_size_multiplier());
290 
291   return is_hanging;
292 }
293 
MaybeGetThroughputObservation(int32_t * downstream_kbps)294 bool ThroughputAnalyzer::MaybeGetThroughputObservation(
295     int32_t* downstream_kbps) {
296   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
297   DCHECK(downstream_kbps);
298 
299   if (disable_throughput_measurements_)
300     return false;
301 
302   // Return early if the window that records downstream throughput is currently
303   // inactive because throughput observations can be taken only when the window
304   // is active.
305   if (!IsCurrentlyTrackingThroughput())
306     return false;
307 
308   DCHECK_GE(requests_.size(), params_->throughput_min_requests_in_flight());
309   DCHECK_EQ(0U, accuracy_degrading_requests_.size());
310 
311   base::TimeTicks now = tick_clock_->NowTicks();
312 
313   int64_t bits_received = GetBitsReceived() - bits_received_at_window_start_;
314   DCHECK_LE(window_start_time_, now);
315   DCHECK_LE(0, bits_received);
316   const base::TimeDelta duration = now - window_start_time_;
317 
318   // Ignore tiny/short transfers, which will not produce accurate rates. Skip
319   // the checks if |use_small_responses_| is true.
320   if (!params_->use_small_responses() &&
321       bits_received < params_->GetThroughputMinTransferSizeBits()) {
322     return false;
323   }
324 
325   double downstream_kbps_double = bits_received * duration.ToHz() / 1000;
326 
327   if (IsHangingWindow(bits_received, duration)) {
328     requests_.clear();
329     EndThroughputObservationWindow();
330     return false;
331   }
332 
333   // Round-up |downstream_kbps_double|.
334   *downstream_kbps = base::ClampCeil<int32_t>(downstream_kbps_double);
335   DCHECK(IsCurrentlyTrackingThroughput());
336 
337   // Stop the observation window since a throughput measurement has been taken.
338   EndThroughputObservationWindow();
339   DCHECK(!IsCurrentlyTrackingThroughput());
340 
341   // Maybe start the throughput observation window again so that another
342   // throughput measurement can be taken.
343   MaybeStartThroughputObservationWindow();
344   return true;
345 }
346 
OnConnectionTypeChanged()347 void ThroughputAnalyzer::OnConnectionTypeChanged() {
348   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
349 
350   // All the requests that were previously not degrading the througpput
351   // computation are now spanning a connection change event. These requests
352   // would now degrade the throughput computation accuracy. So, move them to
353   // |accuracy_degrading_requests_|.
354   for (const auto& request : requests_) {
355     accuracy_degrading_requests_.insert(request.first);
356   }
357   requests_.clear();
358   BoundRequestsSize();
359   EndThroughputObservationWindow();
360 
361   last_connection_change_ = tick_clock_->NowTicks();
362 }
363 
SetUseLocalHostRequestsForTesting(bool use_localhost_requests)364 void ThroughputAnalyzer::SetUseLocalHostRequestsForTesting(
365     bool use_localhost_requests) {
366   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
367   use_localhost_requests_for_tests_ = use_localhost_requests;
368 }
369 
GetBitsReceived() const370 int64_t ThroughputAnalyzer::GetBitsReceived() const {
371   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
372   return activity_monitor::GetBytesReceived() * 8;
373 }
374 
CountActiveInFlightRequests() const375 size_t ThroughputAnalyzer::CountActiveInFlightRequests() const {
376   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
377   return requests_.size();
378 }
379 
CountTotalInFlightRequests() const380 size_t ThroughputAnalyzer::CountTotalInFlightRequests() const {
381   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
382   return response_content_sizes_.size();
383 }
384 
CountTotalContentSizeBytes() const385 int64_t ThroughputAnalyzer::CountTotalContentSizeBytes() const {
386   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
387 
388   return total_response_content_size_;
389 }
390 
DegradesAccuracy(const URLRequest & request) const391 bool ThroughputAnalyzer::DegradesAccuracy(const URLRequest& request) const {
392   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
393 
394   bool private_network_request =
395       nqe::internal::IsRequestForPrivateHost(request, net_log_);
396 
397   return !(use_localhost_requests_for_tests_ || !private_network_request) ||
398          request.creation_time() < last_connection_change_;
399 }
400 
BoundRequestsSize()401 void ThroughputAnalyzer::BoundRequestsSize() {
402   if (accuracy_degrading_requests_.size() > kMaxRequestsSize) {
403     // Clear |accuracy_degrading_requests_| since its size has exceeded its
404     // capacity.
405     accuracy_degrading_requests_.clear();
406     // Disable throughput measurements since |this| has lost track of the
407     // accuracy degrading requests.
408     disable_throughput_measurements_ = true;
409 
410     // Reset other variables related to tracking since the tracking is now
411     // disabled.
412     EndThroughputObservationWindow();
413     DCHECK(!IsCurrentlyTrackingThroughput());
414     requests_.clear();
415 
416     // TODO(tbansal): crbug.com/609174 Add UMA to record how frequently this
417     // happens.
418   }
419 
420   if (requests_.size() > kMaxRequestsSize) {
421     // Clear |requests_| since its size has exceeded its capacity.
422     EndThroughputObservationWindow();
423     DCHECK(!IsCurrentlyTrackingThroughput());
424     requests_.clear();
425 
426     // TODO(tbansal): crbug.com/609174 Add UMA to record how frequently this
427     // happens.
428   }
429 }
430 
EraseHangingRequests(const URLRequest & request)431 void ThroughputAnalyzer::EraseHangingRequests(const URLRequest& request) {
432   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
433 
434   DCHECK_LT(0, params_->hanging_request_duration_http_rtt_multiplier());
435 
436   const base::TimeTicks now = tick_clock_->NowTicks();
437 
438   const base::TimeDelta http_rtt =
439       network_quality_estimator_->GetHttpRTT().value_or(base::Seconds(60));
440 
441   size_t count_request_erased = 0;
442   auto request_it = requests_.find(&request);
443   if (request_it != requests_.end()) {
444     base::TimeDelta time_since_last_received = now - request_it->second;
445 
446     if (time_since_last_received >=
447             params_->hanging_request_duration_http_rtt_multiplier() *
448                 http_rtt &&
449         time_since_last_received >= params_->hanging_request_min_duration()) {
450       count_request_erased++;
451       requests_.erase(request_it);
452     }
453   }
454 
455   if (now - last_hanging_request_check_ >= base::Seconds(1)) {
456     // Hanging request check is done at most once per second.
457     last_hanging_request_check_ = now;
458 
459     for (auto it = requests_.begin(); it != requests_.end();) {
460       base::TimeDelta time_since_last_received = now - it->second;
461 
462       if (time_since_last_received >=
463               params_->hanging_request_duration_http_rtt_multiplier() *
464                   http_rtt &&
465           time_since_last_received >= params_->hanging_request_min_duration()) {
466         count_request_erased++;
467         requests_.erase(it++);
468       } else {
469         ++it;
470       }
471     }
472   }
473 
474   if (count_request_erased > 0) {
475     // End the observation window since there is at least one hanging GET in
476     // flight, which may lead to inaccuracies in the throughput estimate
477     // computation.
478     EndThroughputObservationWindow();
479   }
480 }
481 
482 }  // namespace nqe::internal
483 
484 }  // namespace net
485