<lambda>null1package kotlinx.coroutines.internal 2 3 import kotlinx.atomicfu.* 4 5 /** 6 * A thread-safe resource pool. 7 * 8 * [maxCapacity] is the maximum amount of elements. 9 * [create] is the function that creates a new element. 10 * 11 * This is only used in the Native implementation, 12 * but is part of the `concurrent` source set in order to test it on the JVM. 13 */ 14 internal class OnDemandAllocatingPool<T>( 15 private val maxCapacity: Int, 16 private val create: (Int) -> T 17 ) { 18 /** 19 * Number of existing elements + isClosed flag in the highest bit. 20 * Once the flag is set, the value is guaranteed not to change anymore. 21 */ 22 private val controlState = atomic(0) 23 private val elements = atomicArrayOfNulls<T>(maxCapacity) 24 25 /** 26 * Returns the number of elements that need to be cleaned up due to the pool being closed. 27 */ 28 @Suppress("NOTHING_TO_INLINE") 29 private inline fun tryForbidNewElements(): Int { 30 controlState.loop { 31 if (it.isClosed()) return 0 // already closed 32 if (controlState.compareAndSet(it, it or IS_CLOSED_MASK)) return it 33 } 34 } 35 36 @Suppress("NOTHING_TO_INLINE") 37 private inline fun Int.isClosed(): Boolean = this and IS_CLOSED_MASK != 0 38 39 /** 40 * Request that a new element is created. 41 * 42 * Returns `false` if the pool is closed. 43 * 44 * Note that it will still return `true` even if an element was not created due to reaching [maxCapacity]. 45 * 46 * Rethrows the exceptions thrown from [create]. In this case, this operation has no effect. 47 */ 48 fun allocate(): Boolean { 49 controlState.loop { ctl -> 50 if (ctl.isClosed()) return false 51 if (ctl >= maxCapacity) return true 52 if (controlState.compareAndSet(ctl, ctl + 1)) { 53 elements[ctl].value = create(ctl) 54 return true 55 } 56 } 57 } 58 59 /** 60 * Close the pool. 61 * 62 * This will prevent any new elements from being created. 63 * All the elements present in the pool will be returned. 64 * 65 * The function is thread-safe. 66 * 67 * [close] can be called multiple times, but only a single call will return a non-empty list. 68 * This is due to the elements being cleaned out from the pool on the first invocation to avoid memory leaks, 69 * and no new elements being created after. 70 */ 71 fun close(): List<T> { 72 val elementsExisting = tryForbidNewElements() 73 return (0 until elementsExisting).map { i -> 74 // we wait for the element to be created, because we know that eventually it is going to be there 75 loop { 76 val element = elements[i].getAndSet(null) 77 if (element != null) { 78 return@map element 79 } 80 } 81 } 82 } 83 84 // for tests 85 internal fun stateRepresentation(): String { 86 val ctl = controlState.value 87 val elementsStr = (0 until (ctl and IS_CLOSED_MASK.inv())).map { elements[it].value }.toString() 88 val closedStr = if (ctl.isClosed()) "[closed]" else "" 89 return elementsStr + closedStr 90 } 91 92 override fun toString(): String = "OnDemandAllocatingPool(${stateRepresentation()})" 93 } 94 95 // KT-25023 loopnull96private inline fun loop(block: () -> Unit): Nothing { 97 while (true) { 98 block() 99 } 100 } 101 102 private const val IS_CLOSED_MASK = 1 shl 31 103