xref: /aosp_15_r20/system/chre/util/include/chre/util/system/atomic_spsc_queue.h (revision 84e339476a462649f82315436d70fd732297a399)
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef CHRE_UTIL_ATOMIC_SPSC_QUEUE_H_
18 #define CHRE_UTIL_ATOMIC_SPSC_QUEUE_H_
19 
20 #include <algorithm>
21 #include <cstddef>
22 #include <cstdint>
23 #include <new>
24 #include <type_traits>
25 
26 #include "chre/platform/atomic.h"
27 #include "chre/util/fixed_size_vector.h"
28 #include "chre/util/memory.h"
29 #include "chre/util/non_copyable.h"
30 
31 /**
32  * @file
33  * AtomicSpscArrayQueue is a templated fixed-size FIFO queue implemented around
34  * a contiguous array supporting atomic single-producer, single-consumer (SPSC)
35  * usage. In other words, one thread of execution can safely add to the
36  * queue while a different thread of execution can can pull from the queue,
37  * without the use of locking. To ensure safe concurrency, the user of this
38  * class must ensure that producer methods do not interleave with other producer
39  * methods, and likewise for consumer methods. To help ensure this contract is
40  * upheld, producer-only methods are grouped under the Producer subclass
41  * (accessed via AtomicSpscArrayQueue::producer()), and likewise for Consumer.
42  *
43  * To accomplish concurrency without the use of locks, the head and tail
44  * pointers are allowed to increment past the size of the container. They are
45  * reset when new elements are pushed into an empty container, therefore the
46  * usage model must involve relatively frequent emptying of the container to
47  * prevent overflow of the indices. The nearingOverflow() method can be used to
48  * detect when this condition is imminent, and enable flow control or some other
49  * mechanism to ensure the queue is fully emptied before proceeding (though
50  * triggering an assert/fatal error could also be considered, since the set of
51  * conditions required to trigger this condition organically are expected to be
52  * so rare as to be effectively impossible, so a bug is a more likely cause).
53  *
54  * Since modulo operations are common in the internals of this container, it's
55  * recommended to use powers of 2 for the capacity where possible.
56  */
57 
58 namespace chre {
59 
60 template <typename ElementType, size_t kCapacity>
61 class AtomicSpscQueue : public NonCopyable {
62   // Since we rely on being able to increment mHead and mTail beyond kCapacity,
63   // this provides some level of guarantee that we'll be able to do that a few
64   // times before things are reset (when the queue is emptied).
65   static_assert(kCapacity <= UINT32_MAX / 8,
66                 "Large capacity usage of AtomicSpscQueue is not advised");
67 
68  public:
69   /**
70    * Destroying the queue must only be done when it is guaranteed that the
71    * producer and consumer execution contexts are both stopped.
72    */
~AtomicSpscQueue()73   ~AtomicSpscQueue() {
74     size_t sz = size();
75     auto c = consumer();
76     for (size_t i = 0; i < sz; i++) {
77       c.pop();
78     }
79   }
80 
capacity()81   size_t capacity() const {
82     return kCapacity;
83   }
84 
85   /**
86    * Checks whether the queue has not been fully emptied in a long time, and
87    * internal counters are nearing overflow, which would cause significant data
88    * loss if it occurs (consumer sees queue as empty when it actually isn't,
89    * until tail catches up to head). If this possibility is a concern, the
90    * producer should check this and if it returns true, enable flow control to
91    * stop adding new data to the queue until after the queue has been fully
92    * emptied.
93    *
94    * @return true internal counters/indices are nearing overflow
95    */
nearingOverflow()96   bool nearingOverflow() const {
97     return (mTail.load() > UINT32_MAX - kCapacity);
98   }
99 
100   /**
101    * Gets a snapshot of the number of elements currently stored in the queue.
102    * Safe to call from any context.
103    */
size()104   size_t size() const {
105     uint32_t head = mHead.load();
106     uint32_t tail = mTail.load();
107 
108     // Note that head and tail are normally monotonically increasing with
109     // head <= tail, *except* when we are resetting both head and tail to 0
110     // (done only when adding new elements), in which case we reset tail first.
111     // If our reads happened between resetting tail and resetting head, then
112     // tail < head, and we can safely assume the queue is empty.
113     if (head == tail || tail < head) {
114       return 0;
115     } else {
116       return (tail - head);
117     }
118   }
119 
120   /**
121    * Non-const methods within this class must ONLY be invoked from the producer
122    * execution context.
123    */
124   class Producer {
125    public:
capacity()126     size_t capacity() const {
127       return kCapacity;
128     }
full()129     bool full() const {
130       return (size() == kCapacity);
131     }
size()132     size_t size() const {
133       return mQueue.size();
134     }
135 
136     /**
137      * Constructs a new item at the end of the queue in-place.
138      *
139      * WARNING: Undefined behavior if the array queue is currently full.
140      */
141     template <typename... Args>
emplace(Args &&...args)142     void emplace(Args &&...args) {
143       uint32_t newTail;
144       new (nextStorage(&newTail)) ElementType(std::forward<Args>(args)...);
145       mQueue.mTail = newTail;
146     }
147 
148     /**
149      * Pushes an element onto the back of the array queue.
150      *
151      * WARNING: Undefined behavior if the array queue is currently full.
152      */
push(const ElementType & element)153     void push(const ElementType &element) {
154       uint32_t newTail;
155       new (nextStorage(&newTail)) ElementType(element);
156       mQueue.mTail = newTail;
157     }
158 
159     //! Move construction version of push(const ElementType&)
push(ElementType && element)160     void push(ElementType &&element) {
161       uint32_t newTail;
162       new (nextStorage(&newTail)) ElementType(std::move(element));
163       mQueue.mTail = newTail;
164     }
165 
166    private:
167     friend class AtomicSpscQueue;
Producer(AtomicSpscQueue<ElementType,kCapacity> & q)168     Producer(AtomicSpscQueue<ElementType, kCapacity> &q) : mQueue(q) {}
169 
170     AtomicSpscQueue<ElementType, kCapacity> &mQueue;
171 
172     //! Fetches a pointer to the next location where we should push an element,
173     //! and updates bookkeeping for the next next location
nextStorage(uint32_t * newTail)174     ElementType *nextStorage(uint32_t *newTail) {
175       uint32_t tail = mQueue.mTail.load();
176       if (tail != 0 && tail == mQueue.mHead.load()) {
177         // We're empty, so reset both head and tail to 0 so it doesn't continue
178         // to grow (and possibly overflow). Only do this when pushing, as this
179         // is the only place we can guarantee that mHead is stable (there's
180         // nothing for the consumer to retrieve, and attempting to pull from an
181         // empty queue is UB) and mTail is too (we're in the producer context).
182         // Note that we need to reset tail *first* to ensure size() is safe to
183         // call from both contexts.
184         mQueue.mTail = 0;
185         mQueue.mHead = 0;
186         tail = 0;
187       } else {
188         // If tail overflows (only possible if the producer *always* pushes a
189         // new element while the consumer is reading, meaning that the queue
190         // never gets fully emptied, and this continues until the tail pointer
191         // reaches the max here), then size() will consider the queue empty and
192         // things will get very broken.
193         CHRE_ASSERT(tail < UINT32_MAX);
194       }
195 
196       *newTail = tail + 1;
197       return &mQueue.data()[tail % kCapacity];
198     }
199   };
200 
producer()201   Producer producer() {
202     return Producer(*this);
203   }
204 
205   /**
206    * Non-const methods within this class must ONLY be invoked from the consumer
207    * execution context.
208    */
209   class Consumer {
210    public:
capacity()211     size_t capacity() const {
212       return kCapacity;
213     }
empty()214     bool empty() const {
215       return (size() == 0);
216     }
size()217     size_t size() const {
218       return mQueue.size();
219     }
220 
221     /**
222      * Retrieves a reference to the oldest element in the queue.
223      *
224      * WARNING: Undefined behavior if the queue is currently empty
225      */
front()226     ElementType &front() {
227       return mQueue.data()[mQueue.mHead.load() % kCapacity];
228     }
front()229     const ElementType &front() const {
230       return mQueue.data()[mQueue.mHead.load() % kCapacity];
231     }
232 
233     /**
234      * Removes the oldest element in the queue.
235      *
236      * WARNING: Undefined behavior if the queue is currently empty
237      */
pop()238     void pop() {
239       // Destructing prior to moving the head pointer is safe as long as this
240       // doesn't interleave with other Producer methods
241       uint32_t headRaw = mQueue.mHead;
242       uint32_t headIndex = headRaw % kCapacity;
243       mQueue.data()[headIndex].~ElementType();
244       mQueue.mHead = headRaw + 1;
245     }
246 
247     /**
248      * Moves or copies a block of elements into the provided (possibly
249      * uninitialized) destination storage.
250      *
251      * Safe to call if the queue is currently empty (includes an internal
252      * check).
253      *
254      * @param dest Pointer to destination array
255      * @param count Maximum number of elements to extract
256      *
257      * @return Number of elements actually pulled out of the queue.
258      */
extract(ElementType * dest,size_t count)259     size_t extract(ElementType *dest, size_t count) {
260       size_t elementsToCopy = std::min(mQueue.size(), count);
261       return extractInternal(dest, elementsToCopy);
262     }
263 
264     //! Equivalent to extract(ElementType*, size_t) but appends to the provided
265     //! FixedSizeVector up to its capacity
266     template <size_t kDestCapacity>
extract(FixedSizeVector<ElementType,kDestCapacity> * dest)267     size_t extract(FixedSizeVector<ElementType, kDestCapacity> *dest) {
268       size_t destIndex = dest->size();
269       size_t elementsToCopy =
270           std::min(mQueue.size(), dest->capacity() - destIndex);
271 
272       dest->resize(destIndex + elementsToCopy);
273       return extractInternal(&dest->data()[destIndex], elementsToCopy);
274     }
275 
276    private:
277     friend class AtomicSpscQueue;
Consumer(AtomicSpscQueue<ElementType,kCapacity> & q)278     Consumer(AtomicSpscQueue<ElementType, kCapacity> &q) : mQueue(q) {}
279 
280     AtomicSpscQueue<ElementType, kCapacity> &mQueue;
281 
extractInternal(ElementType * dest,size_t elementsToCopy)282     size_t extractInternal(ElementType *dest, size_t elementsToCopy) {
283       if (elementsToCopy > 0) {
284         uint32_t headRaw = mQueue.mHead;
285         size_t headIndex = headRaw % kCapacity;
286 
287         size_t firstCopy = std::min(elementsToCopy, kCapacity - headIndex);
288         uninitializedMoveOrCopy(&mQueue.data()[headIndex], firstCopy, dest);
289         destroy(&mQueue.data()[headIndex], firstCopy);
290 
291         if (firstCopy != elementsToCopy) {
292           size_t secondCopy = elementsToCopy - firstCopy;
293           uninitializedMoveOrCopy(&mQueue.data()[0], secondCopy,
294                                   &dest[firstCopy]);
295           destroy(&mQueue.data()[0], secondCopy);
296         }
297 
298         mQueue.mHead = headRaw + elementsToCopy;
299       }
300 
301       return elementsToCopy;
302     }
303   };
304 
consumer()305   Consumer consumer() {
306     return Consumer(*this);
307   }
308 
309  protected:
310   //! Index of the oldest element on the queue (first to be popped). If the
311   //! queue is empty, this is equal to mTail (modulo kCapacity) *or* for a very
312   //! brief time it may be greater than mTail (when we're resetting both to 0).
313   chre::AtomicUint32 mHead{0};
314 
315   //! Indicator of where we will push the next element -- to provide atomic
316   //! behavior, this may exceed kCapacity, so modulo kCapacity is needed to
317   //! convert this into an array index.
318   chre::AtomicUint32 mTail{0};
319 
320   typename std::aligned_storage<sizeof(ElementType), alignof(ElementType)>::type
321       mData[kCapacity];
322 
data()323   ElementType *data() {
324     return reinterpret_cast<ElementType *>(mData);
325   }
326 };
327 
328 }  // namespace chre
329 
330 #endif  // CHRE_UTIL_ATOMIC_SPSC_QUEUE_H_
331