xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/test_tools/simulator/queue.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/test_tools/simulator/queue.h"
6 
7 #include "quiche/quic/platform/api/quic_logging.h"
8 #include "quiche/quic/test_tools/simulator/simulator.h"
9 
10 namespace quic {
11 namespace simulator {
12 
~ListenerInterface()13 Queue::ListenerInterface::~ListenerInterface() {}
14 
Queue(Simulator * simulator,std::string name,QuicByteCount capacity)15 Queue::Queue(Simulator* simulator, std::string name, QuicByteCount capacity)
16     : Actor(simulator, name),
17       capacity_(capacity),
18       bytes_queued_(0),
19       aggregation_threshold_(0),
20       aggregation_timeout_(QuicTime::Delta::Infinite()),
21       current_bundle_(0),
22       current_bundle_bytes_(0),
23       tx_port_(nullptr),
24       listener_(nullptr) {
25   aggregation_timeout_alarm_.reset(simulator_->GetAlarmFactory()->CreateAlarm(
26       new AggregationAlarmDelegate(this)));
27 }
28 
~Queue()29 Queue::~Queue() { aggregation_timeout_alarm_->PermanentCancel(); }
30 
set_tx_port(ConstrainedPortInterface * port)31 void Queue::set_tx_port(ConstrainedPortInterface* port) { tx_port_ = port; }
32 
AcceptPacket(std::unique_ptr<Packet> packet)33 void Queue::AcceptPacket(std::unique_ptr<Packet> packet) {
34   if (packet->size + bytes_queued_ > capacity_) {
35     QUIC_DVLOG(1) << "Queue [" << name() << "] has received a packet from ["
36                   << packet->source << "] to [" << packet->destination
37                   << "] which is over capacity.  Dropping it.";
38     QUIC_DVLOG(1) << "Queue size: " << bytes_queued_ << " out of " << capacity_
39                   << ".  Packet size: " << packet->size;
40     return;
41   }
42 
43   bytes_queued_ += packet->size;
44   queue_.emplace_back(std::move(packet), current_bundle_);
45 
46   if (IsAggregationEnabled()) {
47     current_bundle_bytes_ += queue_.front().packet->size;
48     if (!aggregation_timeout_alarm_->IsSet()) {
49       aggregation_timeout_alarm_->Set(clock_->Now() + aggregation_timeout_);
50     }
51     if (current_bundle_bytes_ >= aggregation_threshold_) {
52       NextBundle();
53     }
54   }
55 
56   ScheduleNextPacketDequeue();
57 }
58 
Act()59 void Queue::Act() {
60   QUICHE_DCHECK(!queue_.empty());
61   if (tx_port_->TimeUntilAvailable().IsZero()) {
62     QUICHE_DCHECK(bytes_queued_ >= queue_.front().packet->size);
63     bytes_queued_ -= queue_.front().packet->size;
64 
65     tx_port_->AcceptPacket(std::move(queue_.front().packet));
66     queue_.pop_front();
67     if (listener_ != nullptr) {
68       listener_->OnPacketDequeued();
69     }
70   }
71 
72   ScheduleNextPacketDequeue();
73 }
74 
EnableAggregation(QuicByteCount aggregation_threshold,QuicTime::Delta aggregation_timeout)75 void Queue::EnableAggregation(QuicByteCount aggregation_threshold,
76                               QuicTime::Delta aggregation_timeout) {
77   QUICHE_DCHECK_EQ(bytes_queued_, 0u);
78   QUICHE_DCHECK_GT(aggregation_threshold, 0u);
79   QUICHE_DCHECK(!aggregation_timeout.IsZero());
80   QUICHE_DCHECK(!aggregation_timeout.IsInfinite());
81 
82   aggregation_threshold_ = aggregation_threshold;
83   aggregation_timeout_ = aggregation_timeout;
84 }
85 
AggregationAlarmDelegate(Queue * queue)86 Queue::AggregationAlarmDelegate::AggregationAlarmDelegate(Queue* queue)
87     : queue_(queue) {}
88 
OnAlarm()89 void Queue::AggregationAlarmDelegate::OnAlarm() {
90   queue_->NextBundle();
91   queue_->ScheduleNextPacketDequeue();
92 }
93 
EnqueuedPacket(std::unique_ptr<Packet> packet,AggregationBundleNumber bundle)94 Queue::EnqueuedPacket::EnqueuedPacket(std::unique_ptr<Packet> packet,
95                                       AggregationBundleNumber bundle)
96     : packet(std::move(packet)), bundle(bundle) {}
97 
98 Queue::EnqueuedPacket::EnqueuedPacket(EnqueuedPacket&& other) = default;
99 
100 Queue::EnqueuedPacket::~EnqueuedPacket() = default;
101 
NextBundle()102 void Queue::NextBundle() {
103   current_bundle_++;
104   current_bundle_bytes_ = 0;
105   aggregation_timeout_alarm_->Cancel();
106 }
107 
ScheduleNextPacketDequeue()108 void Queue::ScheduleNextPacketDequeue() {
109   if (queue_.empty()) {
110     QUICHE_DCHECK_EQ(bytes_queued_, 0u);
111     return;
112   }
113 
114   if (IsAggregationEnabled() && queue_.front().bundle == current_bundle_) {
115     return;
116   }
117 
118   QuicTime::Delta time_until_available = QuicTime::Delta::Zero();
119   if (tx_port_) {
120     time_until_available = tx_port_->TimeUntilAvailable();
121   }
122 
123   Schedule(clock_->Now() + time_until_available);
124 }
125 
126 }  // namespace simulator
127 }  // namespace quic
128