<lambda>null1 package kotlinx.coroutines.internal
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlin.jvm.*
6
7 private typealias Core<E> = LockFreeTaskQueueCore<E>
8
9 /**
10 * Lock-free Multiply-Producer xxx-Consumer Queue for task scheduling purposes.
11 *
12 * **Note 1: This queue is NOT linearizable. It provides only quiescent consistency for its operations.**
13 * However, this guarantee is strong enough for task-scheduling purposes.
14 * In particular, the following execution is permitted for this queue, but is not permitted for a linearizable queue:
15 *
16 * ```
17 * Thread 1: addLast(1) = true, removeFirstOrNull() = null
18 * Thread 2: addLast(2) = 2 // this operation is concurrent with both operations in the first thread
19 * ```
20 *
21 * **Note 2: When this queue is used with multiple consumers (`singleConsumer == false`) this it is NOT lock-free.**
22 * In particular, consumer spins until producer finishes its operation in the case of near-empty queue.
23 * It is a very short window that could manifest itself rarely and only under specific load conditions,
24 * but it still deprives this algorithm of its lock-freedom.
25 */
26 internal open class LockFreeTaskQueue<E : Any>(
27 singleConsumer: Boolean // true when there is only a single consumer (slightly faster & lock-free)
28 ) {
29 private val _cur = atomic(Core<E>(Core.INITIAL_CAPACITY, singleConsumer))
30
31 // Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
32 val isEmpty: Boolean get() = _cur.value.isEmpty
33 val size: Int get() = _cur.value.size
34
35 fun close() {
36 _cur.loop { cur ->
37 if (cur.close()) return // closed this copy
38 _cur.compareAndSet(cur, cur.next()) // move to next
39 }
40 }
41
42 fun addLast(element: E): Boolean {
43 _cur.loop { cur ->
44 when (cur.addLast(element)) {
45 Core.ADD_SUCCESS -> return true
46 Core.ADD_CLOSED -> return false
47 Core.ADD_FROZEN -> _cur.compareAndSet(cur, cur.next()) // move to next
48 }
49 }
50 }
51
52 @Suppress("UNCHECKED_CAST")
53 fun removeFirstOrNull(): E? {
54 _cur.loop { cur ->
55 val result = cur.removeFirstOrNull()
56 if (result !== Core.REMOVE_FROZEN) return result as E?
57 _cur.compareAndSet(cur, cur.next())
58 }
59 }
60
61 // Used for validation in tests only
62 fun <R> map(transform: (E) -> R): List<R> = _cur.value.map(transform)
63
64 // Used for validation in tests only
65 fun isClosed(): Boolean = _cur.value.isClosed()
66 }
67
68 /**
69 * Lock-free Multiply-Producer xxx-Consumer Queue core.
70 * @see LockFreeTaskQueue
71 */
72 internal class LockFreeTaskQueueCore<E : Any>(
73 private val capacity: Int,
74 private val singleConsumer: Boolean // true when there is only a single consumer (slightly faster)
75 ) {
76 private val mask = capacity - 1
77 private val _next = atomic<Core<E>?>(null)
78 private val _state = atomic(0L)
79 private val array = atomicArrayOfNulls<Any?>(capacity)
80
81 init {
82 check(mask <= MAX_CAPACITY_MASK)
83 check(capacity and mask == 0)
84 }
85
86 // Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
headnull87 val isEmpty: Boolean get() = _state.value.withState { head, tail -> head == tail }
headnull88 val size: Int get() = _state.value.withState { head, tail -> (tail - head) and MAX_CAPACITY_MASK }
89
closenull90 fun close(): Boolean {
91 _state.update { state ->
92 if (state and CLOSED_MASK != 0L) return true // ok - already closed
93 if (state and FROZEN_MASK != 0L) return false // frozen -- try next
94 state or CLOSED_MASK // try set closed bit
95 }
96 return true
97 }
98
99 // ADD_CLOSED | ADD_FROZEN | ADD_SUCCESS
addLastnull100 fun addLast(element: E): Int {
101 _state.loop { state ->
102 if (state and (FROZEN_MASK or CLOSED_MASK) != 0L) return state.addFailReason() // cannot add
103 state.withState { head, tail ->
104 val mask = this.mask // manually move instance field to local for performance
105 // If queue is Single-Consumer then there could be one element beyond head that we cannot overwrite,
106 // so we check for full queue with an extra margin of one element
107 if ((tail + 2) and mask == head and mask) return ADD_FROZEN // overfull, so do freeze & copy
108 // If queue is Multi-Consumer then the consumer could still have not cleared element
109 // despite the above check for one free slot.
110 if (!singleConsumer && array[tail and mask].value != null) {
111 // There are two options in this situation
112 // 1. Spin-wait until consumer clears the slot
113 // 2. Freeze & resize to avoid spinning
114 // We use heuristic here to avoid memory-overallocation
115 // Freeze & reallocate when queue is small or more than half of the queue is used
116 if (capacity < MIN_ADD_SPIN_CAPACITY || (tail - head) and MAX_CAPACITY_MASK > capacity shr 1) {
117 return ADD_FROZEN
118 }
119 // otherwise spin
120 return@loop
121 }
122 val newTail = (tail + 1) and MAX_CAPACITY_MASK
123 if (_state.compareAndSet(state, state.updateTail(newTail))) {
124 // successfully added
125 array[tail and mask].value = element
126 // could have been frozen & copied before this item was set -- correct it by filling placeholder
127 var cur = this
128 while(true) {
129 if (cur._state.value and FROZEN_MASK == 0L) break // all fine -- not frozen yet
130 cur = cur.next().fillPlaceholder(tail, element) ?: break
131 }
132 return ADD_SUCCESS // added successfully
133 }
134 }
135 }
136 }
137
fillPlaceholdernull138 private fun fillPlaceholder(index: Int, element: E): Core<E>? {
139 val old = array[index and mask].value
140 /*
141 * addLast actions:
142 * 1) Commit tail slot
143 * 2) Write element to array slot
144 * 3) Check for array copy
145 *
146 * If copy happened between 2 and 3 then the consumer might have consumed our element,
147 * then another producer might have written its placeholder in our slot, so we should
148 * perform *unique* check that current placeholder is our to avoid overwriting another producer placeholder
149 */
150 if (old is Placeholder && old.index == index) {
151 array[index and mask].value = element
152 // we've corrected missing element, should check if that propagated to further copies, just in case
153 return this
154 }
155 // it is Ok, no need for further action
156 return null
157 }
158
159 // REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
removeFirstOrNullnull160 fun removeFirstOrNull(): Any? {
161 _state.loop { state ->
162 if (state and FROZEN_MASK != 0L) return REMOVE_FROZEN // frozen -- cannot modify
163 state.withState { head, tail ->
164 if ((tail and mask) == (head and mask)) return null // empty
165 val element = array[head and mask].value
166 if (element == null) {
167 // If queue is Single-Consumer, then element == null only when add has not finished yet
168 if (singleConsumer) return null // consider it not added yet
169 // retry (spin) until consumer adds it
170 return@loop
171 }
172 // element == Placeholder can only be when add has not finished yet
173 if (element is Placeholder) return null // consider it not added yet
174 // we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
175 val newHead = (head + 1) and MAX_CAPACITY_MASK
176 if (_state.compareAndSet(state, state.updateHead(newHead))) {
177 // Array could have been copied by another thread and it is perfectly fine, since only elements
178 // between head and tail were copied and there are no extra steps we should take here
179 array[head and mask].value = null // now can safely put null (state was updated)
180 return element // successfully removed in fast-path
181 }
182 // Multi-Consumer queue must retry this loop on CAS failure (another consumer might have removed element)
183 if (!singleConsumer) return@loop
184 // Single-consumer queue goes to slow-path for remove in case of interference
185 var cur = this
186 while (true) {
187 @Suppress("UNUSED_VALUE")
188 cur = cur.removeSlowPath(head, newHead) ?: return element
189 }
190 }
191 }
192 }
193
removeSlowPathnull194 private fun removeSlowPath(oldHead: Int, newHead: Int): Core<E>? {
195 _state.loop { state ->
196 state.withState { head, _ ->
197 assert { head == oldHead } // "This queue can have only one consumer"
198 if (state and FROZEN_MASK != 0L) {
199 // state was already frozen, so removed element was copied to next
200 return next() // continue to correct head in next
201 }
202 if (_state.compareAndSet(state, state.updateHead(newHead))) {
203 array[head and mask].value = null // now can safely put null (state was updated)
204 return null
205 }
206 }
207 }
208 }
209
nextnull210 fun next(): LockFreeTaskQueueCore<E> = allocateOrGetNextCopy(markFrozen())
211
212 private fun markFrozen(): Long =
213 _state.updateAndGet { state ->
214 if (state and FROZEN_MASK != 0L) return state // already marked
215 state or FROZEN_MASK
216 }
217
allocateOrGetNextCopynull218 private fun allocateOrGetNextCopy(state: Long): Core<E> {
219 _next.loop { next ->
220 if (next != null) return next // already allocated & copied
221 _next.compareAndSet(null, allocateNextCopy(state))
222 }
223 }
224
allocateNextCopynull225 private fun allocateNextCopy(state: Long): Core<E> {
226 val next = LockFreeTaskQueueCore<E>(capacity * 2, singleConsumer)
227 state.withState { head, tail ->
228 var index = head
229 while (index and mask != tail and mask) {
230 // replace nulls with placeholders on copy
231 val value = array[index and mask].value ?: Placeholder(index)
232 next.array[index and next.mask].value = value
233 index++
234 }
235 next._state.value = state wo FROZEN_MASK
236 }
237 return next
238 }
239
240 // Used for validation in tests only
mapnull241 fun <R> map(transform: (E) -> R): List<R> {
242 val res = ArrayList<R>(capacity)
243 _state.value.withState { head, tail ->
244 var index = head
245 while (index and mask != tail and mask) {
246 // replace nulls with placeholders on copy
247 val element = array[index and mask].value
248 @Suppress("UNCHECKED_CAST")
249 if (element != null && element !is Placeholder) res.add(transform(element as E))
250 index++
251 }
252 }
253 return res
254 }
255
256 // Used for validation in tests only
isClosednull257 fun isClosed(): Boolean = _state.value and CLOSED_MASK != 0L
258
259
260 // Instance of this class is placed into array when we have to copy array, but addLast is in progress --
261 // it had already reserved a slot in the array (with null) and have not yet put its value there.
262 // Placeholder keeps the actual index (not masked) to distinguish placeholders on different wraparounds of array
263 // Internal because of inlining
264 internal class Placeholder(@JvmField val index: Int)
265
266 @Suppress("PrivatePropertyName", "MemberVisibilityCanBePrivate")
267 internal companion object {
268 const val INITIAL_CAPACITY = 8
269
270 const val CAPACITY_BITS = 30
271 const val MAX_CAPACITY_MASK = (1 shl CAPACITY_BITS) - 1
272 const val HEAD_SHIFT = 0
273 const val HEAD_MASK = MAX_CAPACITY_MASK.toLong() shl HEAD_SHIFT
274 const val TAIL_SHIFT = HEAD_SHIFT + CAPACITY_BITS
275 const val TAIL_MASK = MAX_CAPACITY_MASK.toLong() shl TAIL_SHIFT
276
277 const val FROZEN_SHIFT = TAIL_SHIFT + CAPACITY_BITS
278 const val FROZEN_MASK = 1L shl FROZEN_SHIFT
279 const val CLOSED_SHIFT = FROZEN_SHIFT + 1
280 const val CLOSED_MASK = 1L shl CLOSED_SHIFT
281
282 const val MIN_ADD_SPIN_CAPACITY = 1024
283
284 @JvmField val REMOVE_FROZEN = Symbol("REMOVE_FROZEN")
285
286 const val ADD_SUCCESS = 0
287 const val ADD_FROZEN = 1
288 const val ADD_CLOSED = 2
289
290 infix fun Long.wo(other: Long) = this and other.inv()
291 fun Long.updateHead(newHead: Int) = (this wo HEAD_MASK) or (newHead.toLong() shl HEAD_SHIFT)
292 fun Long.updateTail(newTail: Int) = (this wo TAIL_MASK) or (newTail.toLong() shl TAIL_SHIFT)
293
294 inline fun <T> Long.withState(block: (head: Int, tail: Int) -> T): T {
295 val head = ((this and HEAD_MASK) shr HEAD_SHIFT).toInt()
296 val tail = ((this and TAIL_MASK) shr TAIL_SHIFT).toInt()
297 return block(head, tail)
298 }
299
300 // FROZEN | CLOSED
301 fun Long.addFailReason(): Int = if (this and CLOSED_MASK != 0L) ADD_CLOSED else ADD_FROZEN
302 }
303 }
304