1 /* 2 * Copyright (C) 2022 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef CHRE_UTIL_ATOMIC_SPSC_QUEUE_H_ 18 #define CHRE_UTIL_ATOMIC_SPSC_QUEUE_H_ 19 20 #include <algorithm> 21 #include <cstddef> 22 #include <cstdint> 23 #include <new> 24 #include <type_traits> 25 26 #include "chre/platform/atomic.h" 27 #include "chre/util/fixed_size_vector.h" 28 #include "chre/util/memory.h" 29 #include "chre/util/non_copyable.h" 30 31 /** 32 * @file 33 * AtomicSpscArrayQueue is a templated fixed-size FIFO queue implemented around 34 * a contiguous array supporting atomic single-producer, single-consumer (SPSC) 35 * usage. In other words, one thread of execution can safely add to the 36 * queue while a different thread of execution can can pull from the queue, 37 * without the use of locking. To ensure safe concurrency, the user of this 38 * class must ensure that producer methods do not interleave with other producer 39 * methods, and likewise for consumer methods. To help ensure this contract is 40 * upheld, producer-only methods are grouped under the Producer subclass 41 * (accessed via AtomicSpscArrayQueue::producer()), and likewise for Consumer. 42 * 43 * To accomplish concurrency without the use of locks, the head and tail 44 * pointers are allowed to increment past the size of the container. They are 45 * reset when new elements are pushed into an empty container, therefore the 46 * usage model must involve relatively frequent emptying of the container to 47 * prevent overflow of the indices. The nearingOverflow() method can be used to 48 * detect when this condition is imminent, and enable flow control or some other 49 * mechanism to ensure the queue is fully emptied before proceeding (though 50 * triggering an assert/fatal error could also be considered, since the set of 51 * conditions required to trigger this condition organically are expected to be 52 * so rare as to be effectively impossible, so a bug is a more likely cause). 53 * 54 * Since modulo operations are common in the internals of this container, it's 55 * recommended to use powers of 2 for the capacity where possible. 56 */ 57 58 namespace chre { 59 60 template <typename ElementType, size_t kCapacity> 61 class AtomicSpscQueue : public NonCopyable { 62 // Since we rely on being able to increment mHead and mTail beyond kCapacity, 63 // this provides some level of guarantee that we'll be able to do that a few 64 // times before things are reset (when the queue is emptied). 65 static_assert(kCapacity <= UINT32_MAX / 8, 66 "Large capacity usage of AtomicSpscQueue is not advised"); 67 68 public: 69 /** 70 * Destroying the queue must only be done when it is guaranteed that the 71 * producer and consumer execution contexts are both stopped. 72 */ ~AtomicSpscQueue()73 ~AtomicSpscQueue() { 74 size_t sz = size(); 75 auto c = consumer(); 76 for (size_t i = 0; i < sz; i++) { 77 c.pop(); 78 } 79 } 80 capacity()81 size_t capacity() const { 82 return kCapacity; 83 } 84 85 /** 86 * Checks whether the queue has not been fully emptied in a long time, and 87 * internal counters are nearing overflow, which would cause significant data 88 * loss if it occurs (consumer sees queue as empty when it actually isn't, 89 * until tail catches up to head). If this possibility is a concern, the 90 * producer should check this and if it returns true, enable flow control to 91 * stop adding new data to the queue until after the queue has been fully 92 * emptied. 93 * 94 * @return true internal counters/indices are nearing overflow 95 */ nearingOverflow()96 bool nearingOverflow() const { 97 return (mTail.load() > UINT32_MAX - kCapacity); 98 } 99 100 /** 101 * Gets a snapshot of the number of elements currently stored in the queue. 102 * Safe to call from any context. 103 */ size()104 size_t size() const { 105 uint32_t head = mHead.load(); 106 uint32_t tail = mTail.load(); 107 108 // Note that head and tail are normally monotonically increasing with 109 // head <= tail, *except* when we are resetting both head and tail to 0 110 // (done only when adding new elements), in which case we reset tail first. 111 // If our reads happened between resetting tail and resetting head, then 112 // tail < head, and we can safely assume the queue is empty. 113 if (head == tail || tail < head) { 114 return 0; 115 } else { 116 return (tail - head); 117 } 118 } 119 120 /** 121 * Non-const methods within this class must ONLY be invoked from the producer 122 * execution context. 123 */ 124 class Producer { 125 public: capacity()126 size_t capacity() const { 127 return kCapacity; 128 } full()129 bool full() const { 130 return (size() == kCapacity); 131 } size()132 size_t size() const { 133 return mQueue.size(); 134 } 135 136 /** 137 * Constructs a new item at the end of the queue in-place. 138 * 139 * WARNING: Undefined behavior if the array queue is currently full. 140 */ 141 template <typename... Args> emplace(Args &&...args)142 void emplace(Args &&...args) { 143 uint32_t newTail; 144 new (nextStorage(&newTail)) ElementType(std::forward<Args>(args)...); 145 mQueue.mTail = newTail; 146 } 147 148 /** 149 * Pushes an element onto the back of the array queue. 150 * 151 * WARNING: Undefined behavior if the array queue is currently full. 152 */ push(const ElementType & element)153 void push(const ElementType &element) { 154 uint32_t newTail; 155 new (nextStorage(&newTail)) ElementType(element); 156 mQueue.mTail = newTail; 157 } 158 159 //! Move construction version of push(const ElementType&) push(ElementType && element)160 void push(ElementType &&element) { 161 uint32_t newTail; 162 new (nextStorage(&newTail)) ElementType(std::move(element)); 163 mQueue.mTail = newTail; 164 } 165 166 private: 167 friend class AtomicSpscQueue; Producer(AtomicSpscQueue<ElementType,kCapacity> & q)168 Producer(AtomicSpscQueue<ElementType, kCapacity> &q) : mQueue(q) {} 169 170 AtomicSpscQueue<ElementType, kCapacity> &mQueue; 171 172 //! Fetches a pointer to the next location where we should push an element, 173 //! and updates bookkeeping for the next next location nextStorage(uint32_t * newTail)174 ElementType *nextStorage(uint32_t *newTail) { 175 uint32_t tail = mQueue.mTail.load(); 176 if (tail != 0 && tail == mQueue.mHead.load()) { 177 // We're empty, so reset both head and tail to 0 so it doesn't continue 178 // to grow (and possibly overflow). Only do this when pushing, as this 179 // is the only place we can guarantee that mHead is stable (there's 180 // nothing for the consumer to retrieve, and attempting to pull from an 181 // empty queue is UB) and mTail is too (we're in the producer context). 182 // Note that we need to reset tail *first* to ensure size() is safe to 183 // call from both contexts. 184 mQueue.mTail = 0; 185 mQueue.mHead = 0; 186 tail = 0; 187 } else { 188 // If tail overflows (only possible if the producer *always* pushes a 189 // new element while the consumer is reading, meaning that the queue 190 // never gets fully emptied, and this continues until the tail pointer 191 // reaches the max here), then size() will consider the queue empty and 192 // things will get very broken. 193 CHRE_ASSERT(tail < UINT32_MAX); 194 } 195 196 *newTail = tail + 1; 197 return &mQueue.data()[tail % kCapacity]; 198 } 199 }; 200 producer()201 Producer producer() { 202 return Producer(*this); 203 } 204 205 /** 206 * Non-const methods within this class must ONLY be invoked from the consumer 207 * execution context. 208 */ 209 class Consumer { 210 public: capacity()211 size_t capacity() const { 212 return kCapacity; 213 } empty()214 bool empty() const { 215 return (size() == 0); 216 } size()217 size_t size() const { 218 return mQueue.size(); 219 } 220 221 /** 222 * Retrieves a reference to the oldest element in the queue. 223 * 224 * WARNING: Undefined behavior if the queue is currently empty 225 */ front()226 ElementType &front() { 227 return mQueue.data()[mQueue.mHead.load() % kCapacity]; 228 } front()229 const ElementType &front() const { 230 return mQueue.data()[mQueue.mHead.load() % kCapacity]; 231 } 232 233 /** 234 * Removes the oldest element in the queue. 235 * 236 * WARNING: Undefined behavior if the queue is currently empty 237 */ pop()238 void pop() { 239 // Destructing prior to moving the head pointer is safe as long as this 240 // doesn't interleave with other Producer methods 241 uint32_t headRaw = mQueue.mHead; 242 uint32_t headIndex = headRaw % kCapacity; 243 mQueue.data()[headIndex].~ElementType(); 244 mQueue.mHead = headRaw + 1; 245 } 246 247 /** 248 * Moves or copies a block of elements into the provided (possibly 249 * uninitialized) destination storage. 250 * 251 * Safe to call if the queue is currently empty (includes an internal 252 * check). 253 * 254 * @param dest Pointer to destination array 255 * @param count Maximum number of elements to extract 256 * 257 * @return Number of elements actually pulled out of the queue. 258 */ extract(ElementType * dest,size_t count)259 size_t extract(ElementType *dest, size_t count) { 260 size_t elementsToCopy = std::min(mQueue.size(), count); 261 return extractInternal(dest, elementsToCopy); 262 } 263 264 //! Equivalent to extract(ElementType*, size_t) but appends to the provided 265 //! FixedSizeVector up to its capacity 266 template <size_t kDestCapacity> extract(FixedSizeVector<ElementType,kDestCapacity> * dest)267 size_t extract(FixedSizeVector<ElementType, kDestCapacity> *dest) { 268 size_t destIndex = dest->size(); 269 size_t elementsToCopy = 270 std::min(mQueue.size(), dest->capacity() - destIndex); 271 272 dest->resize(destIndex + elementsToCopy); 273 return extractInternal(&dest->data()[destIndex], elementsToCopy); 274 } 275 276 private: 277 friend class AtomicSpscQueue; Consumer(AtomicSpscQueue<ElementType,kCapacity> & q)278 Consumer(AtomicSpscQueue<ElementType, kCapacity> &q) : mQueue(q) {} 279 280 AtomicSpscQueue<ElementType, kCapacity> &mQueue; 281 extractInternal(ElementType * dest,size_t elementsToCopy)282 size_t extractInternal(ElementType *dest, size_t elementsToCopy) { 283 if (elementsToCopy > 0) { 284 uint32_t headRaw = mQueue.mHead; 285 size_t headIndex = headRaw % kCapacity; 286 287 size_t firstCopy = std::min(elementsToCopy, kCapacity - headIndex); 288 uninitializedMoveOrCopy(&mQueue.data()[headIndex], firstCopy, dest); 289 destroy(&mQueue.data()[headIndex], firstCopy); 290 291 if (firstCopy != elementsToCopy) { 292 size_t secondCopy = elementsToCopy - firstCopy; 293 uninitializedMoveOrCopy(&mQueue.data()[0], secondCopy, 294 &dest[firstCopy]); 295 destroy(&mQueue.data()[0], secondCopy); 296 } 297 298 mQueue.mHead = headRaw + elementsToCopy; 299 } 300 301 return elementsToCopy; 302 } 303 }; 304 consumer()305 Consumer consumer() { 306 return Consumer(*this); 307 } 308 309 protected: 310 //! Index of the oldest element on the queue (first to be popped). If the 311 //! queue is empty, this is equal to mTail (modulo kCapacity) *or* for a very 312 //! brief time it may be greater than mTail (when we're resetting both to 0). 313 chre::AtomicUint32 mHead{0}; 314 315 //! Indicator of where we will push the next element -- to provide atomic 316 //! behavior, this may exceed kCapacity, so modulo kCapacity is needed to 317 //! convert this into an array index. 318 chre::AtomicUint32 mTail{0}; 319 320 typename std::aligned_storage<sizeof(ElementType), alignof(ElementType)>::type 321 mData[kCapacity]; 322 data()323 ElementType *data() { 324 return reinterpret_cast<ElementType *>(mData); 325 } 326 }; 327 328 } // namespace chre 329 330 #endif // CHRE_UTIL_ATOMIC_SPSC_QUEUE_H_ 331