1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/test_tools/simulator/queue.h"
6
7 #include "quiche/quic/platform/api/quic_logging.h"
8 #include "quiche/quic/test_tools/simulator/simulator.h"
9
10 namespace quic {
11 namespace simulator {
12
~ListenerInterface()13 Queue::ListenerInterface::~ListenerInterface() {}
14
Queue(Simulator * simulator,std::string name,QuicByteCount capacity)15 Queue::Queue(Simulator* simulator, std::string name, QuicByteCount capacity)
16 : Actor(simulator, name),
17 capacity_(capacity),
18 bytes_queued_(0),
19 aggregation_threshold_(0),
20 aggregation_timeout_(QuicTime::Delta::Infinite()),
21 current_bundle_(0),
22 current_bundle_bytes_(0),
23 tx_port_(nullptr),
24 listener_(nullptr) {
25 aggregation_timeout_alarm_.reset(simulator_->GetAlarmFactory()->CreateAlarm(
26 new AggregationAlarmDelegate(this)));
27 }
28
~Queue()29 Queue::~Queue() { aggregation_timeout_alarm_->PermanentCancel(); }
30
set_tx_port(ConstrainedPortInterface * port)31 void Queue::set_tx_port(ConstrainedPortInterface* port) { tx_port_ = port; }
32
AcceptPacket(std::unique_ptr<Packet> packet)33 void Queue::AcceptPacket(std::unique_ptr<Packet> packet) {
34 if (packet->size + bytes_queued_ > capacity_) {
35 QUIC_DVLOG(1) << "Queue [" << name() << "] has received a packet from ["
36 << packet->source << "] to [" << packet->destination
37 << "] which is over capacity. Dropping it.";
38 QUIC_DVLOG(1) << "Queue size: " << bytes_queued_ << " out of " << capacity_
39 << ". Packet size: " << packet->size;
40 return;
41 }
42
43 bytes_queued_ += packet->size;
44 queue_.emplace_back(std::move(packet), current_bundle_);
45
46 if (IsAggregationEnabled()) {
47 current_bundle_bytes_ += queue_.front().packet->size;
48 if (!aggregation_timeout_alarm_->IsSet()) {
49 aggregation_timeout_alarm_->Set(clock_->Now() + aggregation_timeout_);
50 }
51 if (current_bundle_bytes_ >= aggregation_threshold_) {
52 NextBundle();
53 }
54 }
55
56 ScheduleNextPacketDequeue();
57 }
58
Act()59 void Queue::Act() {
60 QUICHE_DCHECK(!queue_.empty());
61 if (tx_port_->TimeUntilAvailable().IsZero()) {
62 QUICHE_DCHECK(bytes_queued_ >= queue_.front().packet->size);
63 bytes_queued_ -= queue_.front().packet->size;
64
65 tx_port_->AcceptPacket(std::move(queue_.front().packet));
66 queue_.pop_front();
67 if (listener_ != nullptr) {
68 listener_->OnPacketDequeued();
69 }
70 }
71
72 ScheduleNextPacketDequeue();
73 }
74
EnableAggregation(QuicByteCount aggregation_threshold,QuicTime::Delta aggregation_timeout)75 void Queue::EnableAggregation(QuicByteCount aggregation_threshold,
76 QuicTime::Delta aggregation_timeout) {
77 QUICHE_DCHECK_EQ(bytes_queued_, 0u);
78 QUICHE_DCHECK_GT(aggregation_threshold, 0u);
79 QUICHE_DCHECK(!aggregation_timeout.IsZero());
80 QUICHE_DCHECK(!aggregation_timeout.IsInfinite());
81
82 aggregation_threshold_ = aggregation_threshold;
83 aggregation_timeout_ = aggregation_timeout;
84 }
85
AggregationAlarmDelegate(Queue * queue)86 Queue::AggregationAlarmDelegate::AggregationAlarmDelegate(Queue* queue)
87 : queue_(queue) {}
88
OnAlarm()89 void Queue::AggregationAlarmDelegate::OnAlarm() {
90 queue_->NextBundle();
91 queue_->ScheduleNextPacketDequeue();
92 }
93
EnqueuedPacket(std::unique_ptr<Packet> packet,AggregationBundleNumber bundle)94 Queue::EnqueuedPacket::EnqueuedPacket(std::unique_ptr<Packet> packet,
95 AggregationBundleNumber bundle)
96 : packet(std::move(packet)), bundle(bundle) {}
97
98 Queue::EnqueuedPacket::EnqueuedPacket(EnqueuedPacket&& other) = default;
99
100 Queue::EnqueuedPacket::~EnqueuedPacket() = default;
101
NextBundle()102 void Queue::NextBundle() {
103 current_bundle_++;
104 current_bundle_bytes_ = 0;
105 aggregation_timeout_alarm_->Cancel();
106 }
107
ScheduleNextPacketDequeue()108 void Queue::ScheduleNextPacketDequeue() {
109 if (queue_.empty()) {
110 QUICHE_DCHECK_EQ(bytes_queued_, 0u);
111 return;
112 }
113
114 if (IsAggregationEnabled() && queue_.front().bundle == current_bundle_) {
115 return;
116 }
117
118 QuicTime::Delta time_until_available = QuicTime::Delta::Zero();
119 if (tx_port_) {
120 time_until_available = tx_port_->TimeUntilAvailable();
121 }
122
123 Schedule(clock_->Now() + time_until_available);
124 }
125
126 } // namespace simulator
127 } // namespace quic
128