1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef QUICHE_QUIC_TEST_TOOLS_SIMULATOR_QUEUE_H_ 6 #define QUICHE_QUIC_TEST_TOOLS_SIMULATOR_QUEUE_H_ 7 8 #include "quiche/quic/core/quic_alarm.h" 9 #include "quiche/quic/test_tools/simulator/link.h" 10 #include "quiche/common/quiche_circular_deque.h" 11 12 namespace quic { 13 namespace simulator { 14 15 // A finitely sized queue which egresses packets onto a constrained link. The 16 // capacity of the queue is measured in bytes as opposed to packets. 17 class Queue : public Actor, public UnconstrainedPortInterface { 18 public: 19 class ListenerInterface { 20 public: 21 virtual ~ListenerInterface(); 22 23 // Called whenever a packet is removed from the queue. 24 virtual void OnPacketDequeued() = 0; 25 }; 26 27 Queue(Simulator* simulator, std::string name, QuicByteCount capacity); 28 Queue(const Queue&) = delete; 29 Queue& operator=(const Queue&) = delete; 30 ~Queue() override; 31 32 void set_tx_port(ConstrainedPortInterface* port); 33 34 void AcceptPacket(std::unique_ptr<Packet> packet) override; 35 36 void Act() override; 37 capacity()38 QuicByteCount capacity() const { return capacity_; } bytes_queued()39 QuicByteCount bytes_queued() const { return bytes_queued_; } packets_queued()40 QuicPacketCount packets_queued() const { return queue_.size(); } 41 set_listener_interface(ListenerInterface * listener)42 void set_listener_interface(ListenerInterface* listener) { 43 listener_ = listener; 44 } 45 46 // Enables packet aggregation on the queue. Packet aggregation makes the 47 // queue bundle packets up until they reach certain size. When the 48 // aggregation is enabled, the packets are not dequeued until the total size 49 // of packets in the queue reaches |aggregation_threshold|. The packets are 50 // automatically flushed from the queue if the oldest packet has been in it 51 // for |aggregation_timeout|. 52 // 53 // This method may only be called when the queue is empty. Once enabled, 54 // aggregation cannot be disabled. 55 void EnableAggregation(QuicByteCount aggregation_threshold, 56 QuicTime::Delta aggregation_timeout); 57 58 private: 59 using AggregationBundleNumber = uint64_t; 60 61 // In order to implement packet aggregation, each packet is tagged with a 62 // bundle number. The queue keeps a bundle counter, and whenever a bundle is 63 // ready, it increments the number of the current bundle. Only the packets 64 // outside of the current bundle are allowed to leave the queue. 65 struct EnqueuedPacket { 66 EnqueuedPacket(std::unique_ptr<Packet> packet, 67 AggregationBundleNumber bundle); 68 EnqueuedPacket(EnqueuedPacket&& other); 69 ~EnqueuedPacket(); 70 71 std::unique_ptr<Packet> packet; 72 AggregationBundleNumber bundle; 73 }; 74 75 // Alarm handler for aggregation timeout. 76 class AggregationAlarmDelegate : public QuicAlarm::DelegateWithoutContext { 77 public: 78 explicit AggregationAlarmDelegate(Queue* queue); 79 80 void OnAlarm() override; 81 82 private: 83 Queue* queue_; 84 }; 85 IsAggregationEnabled()86 bool IsAggregationEnabled() const { return aggregation_threshold_ > 0; } 87 88 // Increment the bundle counter and reset the bundle state. This causes all 89 // packets currently in the bundle to be flushed onto the link. 90 void NextBundle(); 91 92 void ScheduleNextPacketDequeue(); 93 94 const QuicByteCount capacity_; 95 QuicByteCount bytes_queued_; 96 97 QuicByteCount aggregation_threshold_; 98 QuicTime::Delta aggregation_timeout_; 99 // The number of the current aggregation bundle. Monotonically increasing. 100 // All packets in the previous bundles are allowed to leave the queue, and 101 // none of the packets in the current one are. 102 AggregationBundleNumber current_bundle_; 103 // Size of the current bundle. Whenever it exceeds |aggregation_threshold_|, 104 // the next bundle is created. 105 QuicByteCount current_bundle_bytes_; 106 // Alarm responsible for flushing the current bundle upon timeout. Set when 107 // the first packet in the bundle is enqueued. 108 std::unique_ptr<QuicAlarm> aggregation_timeout_alarm_; 109 110 ConstrainedPortInterface* tx_port_; 111 quiche::QuicheCircularDeque<EnqueuedPacket> queue_; 112 113 ListenerInterface* listener_; 114 }; 115 116 } // namespace simulator 117 } // namespace quic 118 119 #endif // QUICHE_QUIC_TEST_TOOLS_SIMULATOR_QUEUE_H_ 120