<lambda>null1 package kotlinx.coroutines.internal
2 
3 import kotlinx.atomicfu.*
4 
5 /**
6  * A thread-safe resource pool.
7  *
8  * [maxCapacity] is the maximum amount of elements.
9  * [create] is the function that creates a new element.
10  *
11  * This is only used in the Native implementation,
12  * but is part of the `concurrent` source set in order to test it on the JVM.
13  */
14 internal class OnDemandAllocatingPool<T>(
15     private val maxCapacity: Int,
16     private val create: (Int) -> T
17 ) {
18     /**
19      * Number of existing elements + isClosed flag in the highest bit.
20      * Once the flag is set, the value is guaranteed not to change anymore.
21      */
22     private val controlState = atomic(0)
23     private val elements = atomicArrayOfNulls<T>(maxCapacity)
24 
25     /**
26      * Returns the number of elements that need to be cleaned up due to the pool being closed.
27      */
28     @Suppress("NOTHING_TO_INLINE")
29     private inline fun tryForbidNewElements(): Int {
30         controlState.loop {
31             if (it.isClosed()) return 0 // already closed
32             if (controlState.compareAndSet(it, it or IS_CLOSED_MASK)) return it
33         }
34     }
35 
36     @Suppress("NOTHING_TO_INLINE")
37     private inline fun Int.isClosed(): Boolean = this and IS_CLOSED_MASK != 0
38 
39     /**
40      * Request that a new element is created.
41      *
42      * Returns `false` if the pool is closed.
43      *
44      * Note that it will still return `true` even if an element was not created due to reaching [maxCapacity].
45      *
46      * Rethrows the exceptions thrown from [create]. In this case, this operation has no effect.
47      */
48     fun allocate(): Boolean {
49         controlState.loop { ctl ->
50             if (ctl.isClosed()) return false
51             if (ctl >= maxCapacity) return true
52             if (controlState.compareAndSet(ctl, ctl + 1)) {
53                 elements[ctl].value = create(ctl)
54                 return true
55             }
56         }
57     }
58 
59     /**
60      * Close the pool.
61      *
62      * This will prevent any new elements from being created.
63      * All the elements present in the pool will be returned.
64      *
65      * The function is thread-safe.
66      *
67      * [close] can be called multiple times, but only a single call will return a non-empty list.
68      * This is due to the elements being cleaned out from the pool on the first invocation to avoid memory leaks,
69      * and no new elements being created after.
70      */
71     fun close(): List<T> {
72         val elementsExisting = tryForbidNewElements()
73         return (0 until elementsExisting).map { i ->
74             // we wait for the element to be created, because we know that eventually it is going to be there
75             loop {
76                 val element = elements[i].getAndSet(null)
77                 if (element != null) {
78                     return@map element
79                 }
80             }
81         }
82     }
83 
84     // for tests
85     internal fun stateRepresentation(): String {
86         val ctl = controlState.value
87         val elementsStr = (0 until (ctl and IS_CLOSED_MASK.inv())).map { elements[it].value }.toString()
88         val closedStr = if (ctl.isClosed()) "[closed]" else ""
89         return elementsStr + closedStr
90     }
91 
92     override fun toString(): String = "OnDemandAllocatingPool(${stateRepresentation()})"
93 }
94 
95 // KT-25023
loopnull96 private inline fun loop(block: () -> Unit): Nothing {
97     while (true) {
98         block()
99     }
100 }
101 
102 private const val IS_CLOSED_MASK = 1 shl 31
103