1 /* 2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_ 12 #define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_ 13 14 #include <deque> 15 #include <memory> 16 #include <set> 17 #include <unordered_map> 18 19 #include "absl/types/optional.h" 20 #include "rtc_base/checks.h" 21 22 namespace webrtc { 23 24 // Represents the queue which can be read by multiple readers. Each reader reads 25 // from its own queue head. When an element is added it will become visible for 26 // all readers. When an element will be removed by all the readers, the element 27 // will be removed from the queue. 28 template <typename T> 29 class MultiReaderQueue { 30 public: 31 // Creates queue with exactly `readers_count` readers named from 0 to 32 // `readers_count - 1`. MultiReaderQueue(size_t readers_count)33 explicit MultiReaderQueue(size_t readers_count) { 34 for (size_t i = 0; i < readers_count; ++i) { 35 heads_[i] = 0; 36 } 37 } 38 // Creates queue with specified readers. MultiReaderQueue(std::set<size_t> readers)39 explicit MultiReaderQueue(std::set<size_t> readers) { 40 for (size_t reader : readers) { 41 heads_[reader] = 0; 42 } 43 } 44 45 // Adds a new `reader`, initializing its reading position (the reader's head) 46 // equal to the one of `reader_to_copy`. 47 // Complexity O(MultiReaderQueue::size(reader_to_copy)). AddReader(size_t reader,size_t reader_to_copy)48 void AddReader(size_t reader, size_t reader_to_copy) { 49 size_t pos = GetHeadPositionOrDie(reader_to_copy); 50 51 auto it = heads_.find(reader); 52 RTC_CHECK(it == heads_.end()) 53 << "Reader " << reader << " is already in the queue"; 54 heads_[reader] = heads_[reader_to_copy]; 55 for (size_t i = pos; i < queue_.size(); ++i) { 56 in_queues_[i]++; 57 } 58 } 59 60 // Adds a new `reader`, initializing its reading position equal to first 61 // element in the queue. 62 // Complexity O(MultiReaderQueue::size()). AddReader(size_t reader)63 void AddReader(size_t reader) { 64 auto it = heads_.find(reader); 65 RTC_CHECK(it == heads_.end()) 66 << "Reader " << reader << " is already in the queue"; 67 heads_[reader] = removed_elements_count_; 68 for (size_t i = 0; i < queue_.size(); ++i) { 69 in_queues_[i]++; 70 } 71 } 72 73 // Removes specified `reader` from the queue. 74 // Complexity O(MultiReaderQueue::size(reader)). RemoveReader(size_t reader)75 void RemoveReader(size_t reader) { 76 size_t pos = GetHeadPositionOrDie(reader); 77 for (size_t i = pos; i < queue_.size(); ++i) { 78 in_queues_[i]--; 79 } 80 while (!in_queues_.empty() && in_queues_[0] == 0) { 81 PopFront(); 82 } 83 heads_.erase(reader); 84 } 85 86 // Add value to the end of the queue. Complexity O(1). PushBack(T value)87 void PushBack(T value) { 88 queue_.push_back(value); 89 in_queues_.push_back(heads_.size()); 90 } 91 92 // Extract element from specified head. Complexity O(1). PopFront(size_t reader)93 absl::optional<T> PopFront(size_t reader) { 94 size_t pos = GetHeadPositionOrDie(reader); 95 if (pos >= queue_.size()) { 96 return absl::nullopt; 97 } 98 99 T out = queue_[pos]; 100 101 in_queues_[pos]--; 102 heads_[reader]++; 103 104 if (in_queues_[pos] == 0) { 105 RTC_DCHECK_EQ(pos, 0); 106 PopFront(); 107 } 108 return out; 109 } 110 111 // Returns element at specified head. Complexity O(1). Front(size_t reader)112 absl::optional<T> Front(size_t reader) const { 113 size_t pos = GetHeadPositionOrDie(reader); 114 if (pos >= queue_.size()) { 115 return absl::nullopt; 116 } 117 return queue_[pos]; 118 } 119 120 // Returns true if for specified head there are no more elements in the queue 121 // or false otherwise. Complexity O(1). IsEmpty(size_t reader)122 bool IsEmpty(size_t reader) const { 123 size_t pos = GetHeadPositionOrDie(reader); 124 return pos >= queue_.size(); 125 } 126 127 // Returns size of the longest queue between all readers. 128 // Complexity O(1). size()129 size_t size() const { return queue_.size(); } 130 131 // Returns size of the specified queue. Complexity O(1). size(size_t reader)132 size_t size(size_t reader) const { 133 size_t pos = GetHeadPositionOrDie(reader); 134 return queue_.size() - pos; 135 } 136 137 // Complexity O(1). readers_count()138 size_t readers_count() const { return heads_.size(); } 139 140 private: GetHeadPositionOrDie(size_t reader)141 size_t GetHeadPositionOrDie(size_t reader) const { 142 auto it = heads_.find(reader); 143 RTC_CHECK(it != heads_.end()) << "No queue for reader " << reader; 144 return it->second - removed_elements_count_; 145 } 146 PopFront()147 void PopFront() { 148 RTC_DCHECK(!queue_.empty()); 149 RTC_DCHECK_EQ(in_queues_[0], 0); 150 queue_.pop_front(); 151 in_queues_.pop_front(); 152 removed_elements_count_++; 153 } 154 155 // Number of the elements that were removed from the queue. It is used to 156 // subtract from each head to compute the right index inside `queue_`; 157 size_t removed_elements_count_ = 0; 158 std::deque<T> queue_; 159 // In how may queues the element at index `i` is. An element can be removed 160 // from the front if and only if it is in 0 queues. 161 std::deque<size_t> in_queues_; 162 // Map from the reader to the head position in the queue. 163 std::unordered_map<size_t, size_t> heads_; 164 }; 165 166 } // namespace webrtc 167 168 #endif // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_ 169