xref: /aosp_15_r20/external/webrtc/test/pc/e2e/analyzer/video/multi_reader_queue.h (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
12 #define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
13 
14 #include <deque>
15 #include <memory>
16 #include <set>
17 #include <unordered_map>
18 
19 #include "absl/types/optional.h"
20 #include "rtc_base/checks.h"
21 
22 namespace webrtc {
23 
24 // Represents the queue which can be read by multiple readers. Each reader reads
25 // from its own queue head. When an element is added it will become visible for
26 // all readers. When an element will be removed by all the readers, the element
27 // will be removed from the queue.
28 template <typename T>
29 class MultiReaderQueue {
30  public:
31   // Creates queue with exactly `readers_count` readers named from 0 to
32   // `readers_count - 1`.
MultiReaderQueue(size_t readers_count)33   explicit MultiReaderQueue(size_t readers_count) {
34     for (size_t i = 0; i < readers_count; ++i) {
35       heads_[i] = 0;
36     }
37   }
38   // Creates queue with specified readers.
MultiReaderQueue(std::set<size_t> readers)39   explicit MultiReaderQueue(std::set<size_t> readers) {
40     for (size_t reader : readers) {
41       heads_[reader] = 0;
42     }
43   }
44 
45   // Adds a new `reader`, initializing its reading position (the reader's head)
46   // equal to the one of `reader_to_copy`.
47   // Complexity O(MultiReaderQueue::size(reader_to_copy)).
AddReader(size_t reader,size_t reader_to_copy)48   void AddReader(size_t reader, size_t reader_to_copy) {
49     size_t pos = GetHeadPositionOrDie(reader_to_copy);
50 
51     auto it = heads_.find(reader);
52     RTC_CHECK(it == heads_.end())
53         << "Reader " << reader << " is already in the queue";
54     heads_[reader] = heads_[reader_to_copy];
55     for (size_t i = pos; i < queue_.size(); ++i) {
56       in_queues_[i]++;
57     }
58   }
59 
60   // Adds a new `reader`, initializing its reading position equal to first
61   // element in the queue.
62   // Complexity O(MultiReaderQueue::size()).
AddReader(size_t reader)63   void AddReader(size_t reader) {
64     auto it = heads_.find(reader);
65     RTC_CHECK(it == heads_.end())
66         << "Reader " << reader << " is already in the queue";
67     heads_[reader] = removed_elements_count_;
68     for (size_t i = 0; i < queue_.size(); ++i) {
69       in_queues_[i]++;
70     }
71   }
72 
73   // Removes specified `reader` from the queue.
74   // Complexity O(MultiReaderQueue::size(reader)).
RemoveReader(size_t reader)75   void RemoveReader(size_t reader) {
76     size_t pos = GetHeadPositionOrDie(reader);
77     for (size_t i = pos; i < queue_.size(); ++i) {
78       in_queues_[i]--;
79     }
80     while (!in_queues_.empty() && in_queues_[0] == 0) {
81       PopFront();
82     }
83     heads_.erase(reader);
84   }
85 
86   // Add value to the end of the queue. Complexity O(1).
PushBack(T value)87   void PushBack(T value) {
88     queue_.push_back(value);
89     in_queues_.push_back(heads_.size());
90   }
91 
92   // Extract element from specified head. Complexity O(1).
PopFront(size_t reader)93   absl::optional<T> PopFront(size_t reader) {
94     size_t pos = GetHeadPositionOrDie(reader);
95     if (pos >= queue_.size()) {
96       return absl::nullopt;
97     }
98 
99     T out = queue_[pos];
100 
101     in_queues_[pos]--;
102     heads_[reader]++;
103 
104     if (in_queues_[pos] == 0) {
105       RTC_DCHECK_EQ(pos, 0);
106       PopFront();
107     }
108     return out;
109   }
110 
111   // Returns element at specified head. Complexity O(1).
Front(size_t reader)112   absl::optional<T> Front(size_t reader) const {
113     size_t pos = GetHeadPositionOrDie(reader);
114     if (pos >= queue_.size()) {
115       return absl::nullopt;
116     }
117     return queue_[pos];
118   }
119 
120   // Returns true if for specified head there are no more elements in the queue
121   // or false otherwise. Complexity O(1).
IsEmpty(size_t reader)122   bool IsEmpty(size_t reader) const {
123     size_t pos = GetHeadPositionOrDie(reader);
124     return pos >= queue_.size();
125   }
126 
127   // Returns size of the longest queue between all readers.
128   // Complexity O(1).
size()129   size_t size() const { return queue_.size(); }
130 
131   // Returns size of the specified queue. Complexity O(1).
size(size_t reader)132   size_t size(size_t reader) const {
133     size_t pos = GetHeadPositionOrDie(reader);
134     return queue_.size() - pos;
135   }
136 
137   // Complexity O(1).
readers_count()138   size_t readers_count() const { return heads_.size(); }
139 
140  private:
GetHeadPositionOrDie(size_t reader)141   size_t GetHeadPositionOrDie(size_t reader) const {
142     auto it = heads_.find(reader);
143     RTC_CHECK(it != heads_.end()) << "No queue for reader " << reader;
144     return it->second - removed_elements_count_;
145   }
146 
PopFront()147   void PopFront() {
148     RTC_DCHECK(!queue_.empty());
149     RTC_DCHECK_EQ(in_queues_[0], 0);
150     queue_.pop_front();
151     in_queues_.pop_front();
152     removed_elements_count_++;
153   }
154 
155   // Number of the elements that were removed from the queue. It is used to
156   // subtract from each head to compute the right index inside `queue_`;
157   size_t removed_elements_count_ = 0;
158   std::deque<T> queue_;
159   // In how may queues the element at index `i` is. An element can be removed
160   // from the front if and only if it is in 0 queues.
161   std::deque<size_t> in_queues_;
162   // Map from the reader to the head position in the queue.
163   std::unordered_map<size_t, size_t> heads_;
164 };
165 
166 }  // namespace webrtc
167 
168 #endif  // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
169