<lambda>null1 package kotlinx.coroutines.internal
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlin.jvm.*
6 
7 private typealias Core<E> = LockFreeTaskQueueCore<E>
8 
9 /**
10  * Lock-free Multiply-Producer xxx-Consumer Queue for task scheduling purposes.
11  *
12  * **Note 1: This queue is NOT linearizable. It provides only quiescent consistency for its operations.**
13  * However, this guarantee is strong enough for task-scheduling purposes.
14  * In particular, the following execution is permitted for this queue, but is not permitted for a linearizable queue:
15  *
16  * ```
17  * Thread 1: addLast(1) = true, removeFirstOrNull() = null
18  * Thread 2: addLast(2) = 2 // this operation is concurrent with both operations in the first thread
19  * ```
20  *
21  * **Note 2: When this queue is used with multiple consumers (`singleConsumer == false`) this it is NOT lock-free.**
22  * In particular, consumer spins until producer finishes its operation in the case of near-empty queue.
23  * It is a very short window that could manifest itself rarely and only under specific load conditions,
24  * but it still deprives this algorithm of its lock-freedom.
25  */
26 internal open class LockFreeTaskQueue<E : Any>(
27     singleConsumer: Boolean // true when there is only a single consumer (slightly faster & lock-free)
28 ) {
29     private val _cur = atomic(Core<E>(Core.INITIAL_CAPACITY, singleConsumer))
30 
31     // Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
32     val isEmpty: Boolean get() = _cur.value.isEmpty
33     val size: Int get() = _cur.value.size
34 
35     fun close() {
36         _cur.loop { cur ->
37             if (cur.close()) return // closed this copy
38             _cur.compareAndSet(cur, cur.next()) // move to next
39         }
40     }
41 
42     fun addLast(element: E): Boolean {
43         _cur.loop { cur ->
44             when (cur.addLast(element)) {
45                 Core.ADD_SUCCESS -> return true
46                 Core.ADD_CLOSED -> return false
47                 Core.ADD_FROZEN -> _cur.compareAndSet(cur, cur.next()) // move to next
48             }
49         }
50     }
51 
52     @Suppress("UNCHECKED_CAST")
53     fun removeFirstOrNull(): E? {
54         _cur.loop { cur ->
55             val result = cur.removeFirstOrNull()
56             if (result !== Core.REMOVE_FROZEN) return result as E?
57             _cur.compareAndSet(cur, cur.next())
58         }
59     }
60 
61     // Used for validation in tests only
62     fun <R> map(transform: (E) -> R): List<R> = _cur.value.map(transform)
63 
64     // Used for validation in tests only
65     fun isClosed(): Boolean = _cur.value.isClosed()
66 }
67 
68 /**
69  * Lock-free Multiply-Producer xxx-Consumer Queue core.
70  * @see LockFreeTaskQueue
71  */
72 internal class LockFreeTaskQueueCore<E : Any>(
73     private val capacity: Int,
74     private val singleConsumer: Boolean // true when there is only a single consumer (slightly faster)
75 ) {
76     private val mask = capacity - 1
77     private val _next = atomic<Core<E>?>(null)
78     private val _state = atomic(0L)
79     private val array = atomicArrayOfNulls<Any?>(capacity)
80 
81     init {
82         check(mask <= MAX_CAPACITY_MASK)
83         check(capacity and mask == 0)
84     }
85 
86     // Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
headnull87     val isEmpty: Boolean get() = _state.value.withState { head, tail -> head == tail }
headnull88     val size: Int get() = _state.value.withState { head, tail -> (tail - head) and MAX_CAPACITY_MASK }
89 
closenull90     fun close(): Boolean {
91         _state.update { state ->
92             if (state and CLOSED_MASK != 0L) return true // ok - already closed
93             if (state and FROZEN_MASK != 0L) return false // frozen -- try next
94             state or CLOSED_MASK // try set closed bit
95         }
96         return true
97     }
98 
99     // ADD_CLOSED | ADD_FROZEN | ADD_SUCCESS
addLastnull100     fun addLast(element: E): Int {
101         _state.loop { state ->
102             if (state and (FROZEN_MASK or CLOSED_MASK) != 0L) return state.addFailReason() // cannot add
103             state.withState { head, tail ->
104                 val mask = this.mask // manually move instance field to local for performance
105                 // If queue is Single-Consumer then there could be one element beyond head that we cannot overwrite,
106                 // so we check for full queue with an extra margin of one element
107                 if ((tail + 2) and mask == head and mask) return ADD_FROZEN // overfull, so do freeze & copy
108                 // If queue is Multi-Consumer then the consumer could still have not cleared element
109                 // despite the above check for one free slot.
110                 if (!singleConsumer && array[tail and mask].value != null) {
111                     // There are two options in this situation
112                     // 1. Spin-wait until consumer clears the slot
113                     // 2. Freeze & resize to avoid spinning
114                     // We use heuristic here to avoid memory-overallocation
115                     // Freeze & reallocate when queue is small or more than half of the queue is used
116                     if (capacity < MIN_ADD_SPIN_CAPACITY || (tail - head) and MAX_CAPACITY_MASK > capacity shr 1) {
117                         return ADD_FROZEN
118                     }
119                     // otherwise spin
120                     return@loop
121                 }
122                 val newTail = (tail + 1) and MAX_CAPACITY_MASK
123                 if (_state.compareAndSet(state, state.updateTail(newTail))) {
124                     // successfully added
125                     array[tail and mask].value = element
126                     // could have been frozen & copied before this item was set -- correct it by filling placeholder
127                     var cur = this
128                     while(true) {
129                         if (cur._state.value and FROZEN_MASK == 0L) break // all fine -- not frozen yet
130                         cur = cur.next().fillPlaceholder(tail, element) ?: break
131                     }
132                     return ADD_SUCCESS // added successfully
133                 }
134             }
135         }
136     }
137 
fillPlaceholdernull138     private fun fillPlaceholder(index: Int, element: E): Core<E>? {
139         val old = array[index and mask].value
140         /*
141          * addLast actions:
142          * 1) Commit tail slot
143          * 2) Write element to array slot
144          * 3) Check for array copy
145          *
146          * If copy happened between 2 and 3 then the consumer might have consumed our element,
147          * then another producer might have written its placeholder in our slot, so we should
148          * perform *unique* check that current placeholder is our to avoid overwriting another producer placeholder
149          */
150         if (old is Placeholder && old.index == index) {
151             array[index and mask].value = element
152             // we've corrected missing element, should check if that propagated to further copies, just in case
153             return this
154         }
155         // it is Ok, no need for further action
156         return null
157     }
158 
159     // REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
removeFirstOrNullnull160     fun removeFirstOrNull(): Any? {
161         _state.loop { state ->
162             if (state and FROZEN_MASK != 0L) return REMOVE_FROZEN // frozen -- cannot modify
163             state.withState { head, tail ->
164                 if ((tail and mask) == (head and mask)) return null // empty
165                 val element = array[head and mask].value
166                 if (element == null) {
167                     // If queue is Single-Consumer, then element == null only when add has not finished yet
168                     if (singleConsumer) return null // consider it not added yet
169                     // retry (spin) until consumer adds it
170                     return@loop
171                 }
172                 // element == Placeholder can only be when add has not finished yet
173                 if (element is Placeholder) return null // consider it not added yet
174                 // we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
175                 val newHead = (head + 1) and MAX_CAPACITY_MASK
176                 if (_state.compareAndSet(state, state.updateHead(newHead))) {
177                     // Array could have been copied by another thread and it is perfectly fine, since only elements
178                     // between head and tail were copied and there are no extra steps we should take here
179                     array[head and mask].value = null // now can safely put null (state was updated)
180                     return element // successfully removed in fast-path
181                 }
182                 // Multi-Consumer queue must retry this loop on CAS failure (another consumer might have removed element)
183                 if (!singleConsumer) return@loop
184                 // Single-consumer queue goes to slow-path for remove in case of interference
185                 var cur = this
186                 while (true) {
187                     @Suppress("UNUSED_VALUE")
188                     cur = cur.removeSlowPath(head, newHead) ?: return element
189                 }
190             }
191         }
192     }
193 
removeSlowPathnull194     private fun removeSlowPath(oldHead: Int, newHead: Int): Core<E>? {
195         _state.loop { state ->
196             state.withState { head, _ ->
197                 assert { head == oldHead } // "This queue can have only one consumer"
198                 if (state and FROZEN_MASK != 0L) {
199                     // state was already frozen, so removed element was copied to next
200                     return next() // continue to correct head in next
201                 }
202                 if (_state.compareAndSet(state, state.updateHead(newHead))) {
203                     array[head and mask].value = null // now can safely put null (state was updated)
204                     return null
205                 }
206             }
207         }
208     }
209 
nextnull210     fun next(): LockFreeTaskQueueCore<E> = allocateOrGetNextCopy(markFrozen())
211 
212     private fun markFrozen(): Long =
213         _state.updateAndGet { state ->
214             if (state and FROZEN_MASK != 0L) return state // already marked
215             state or FROZEN_MASK
216         }
217 
allocateOrGetNextCopynull218     private fun allocateOrGetNextCopy(state: Long): Core<E> {
219         _next.loop { next ->
220             if (next != null) return next // already allocated & copied
221             _next.compareAndSet(null, allocateNextCopy(state))
222         }
223     }
224 
allocateNextCopynull225     private fun allocateNextCopy(state: Long): Core<E> {
226         val next = LockFreeTaskQueueCore<E>(capacity * 2, singleConsumer)
227         state.withState { head, tail ->
228             var index = head
229             while (index and mask != tail and mask) {
230                 // replace nulls with placeholders on copy
231                 val value = array[index and mask].value ?: Placeholder(index)
232                 next.array[index and next.mask].value = value
233                 index++
234             }
235             next._state.value = state wo FROZEN_MASK
236         }
237         return next
238     }
239 
240     // Used for validation in tests only
mapnull241     fun <R> map(transform: (E) -> R): List<R> {
242         val res = ArrayList<R>(capacity)
243         _state.value.withState { head, tail ->
244             var index = head
245             while (index and mask != tail and mask) {
246                 // replace nulls with placeholders on copy
247                 val element = array[index and mask].value
248                 @Suppress("UNCHECKED_CAST")
249                 if (element != null && element !is Placeholder) res.add(transform(element as E))
250                 index++
251             }
252         }
253         return res
254     }
255 
256     // Used for validation in tests only
isClosednull257     fun isClosed(): Boolean = _state.value and CLOSED_MASK != 0L
258 
259 
260     // Instance of this class is placed into array when we have to copy array, but addLast is in progress --
261     // it had already reserved a slot in the array (with null) and have not yet put its value there.
262     // Placeholder keeps the actual index (not masked) to distinguish placeholders on different wraparounds of array
263     // Internal because of inlining
264     internal class Placeholder(@JvmField val index: Int)
265 
266     @Suppress("PrivatePropertyName", "MemberVisibilityCanBePrivate")
267     internal companion object {
268         const val INITIAL_CAPACITY = 8
269 
270         const val CAPACITY_BITS = 30
271         const val MAX_CAPACITY_MASK = (1 shl CAPACITY_BITS) - 1
272         const val HEAD_SHIFT = 0
273         const val HEAD_MASK = MAX_CAPACITY_MASK.toLong() shl HEAD_SHIFT
274         const val TAIL_SHIFT = HEAD_SHIFT + CAPACITY_BITS
275         const val TAIL_MASK = MAX_CAPACITY_MASK.toLong() shl TAIL_SHIFT
276 
277         const val FROZEN_SHIFT = TAIL_SHIFT + CAPACITY_BITS
278         const val FROZEN_MASK = 1L shl FROZEN_SHIFT
279         const val CLOSED_SHIFT = FROZEN_SHIFT + 1
280         const val CLOSED_MASK = 1L shl CLOSED_SHIFT
281 
282         const val MIN_ADD_SPIN_CAPACITY = 1024
283 
284         @JvmField val REMOVE_FROZEN = Symbol("REMOVE_FROZEN")
285 
286         const val ADD_SUCCESS = 0
287         const val ADD_FROZEN = 1
288         const val ADD_CLOSED = 2
289 
290         infix fun Long.wo(other: Long) = this and other.inv()
291         fun Long.updateHead(newHead: Int) = (this wo HEAD_MASK) or (newHead.toLong() shl HEAD_SHIFT)
292         fun Long.updateTail(newTail: Int) = (this wo TAIL_MASK) or (newTail.toLong() shl TAIL_SHIFT)
293 
294         inline fun <T> Long.withState(block: (head: Int, tail: Int) -> T): T {
295             val head = ((this and HEAD_MASK) shr HEAD_SHIFT).toInt()
296             val tail = ((this and TAIL_MASK) shr TAIL_SHIFT).toInt()
297             return block(head, tail)
298         }
299 
300         // FROZEN | CLOSED
301         fun Long.addFailReason(): Int = if (this and CLOSED_MASK != 0L) ADD_CLOSED else ADD_FROZEN
302     }
303 }
304