<lambda>null1 package kotlinx.coroutines.debug.internal
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.internal.*
5 import java.lang.ref.*
6 
7 // This is very limited implementation, not suitable as a generic map replacement.
8 // It has lock-free get and put with synchronized rehash for simplicity (and better CPU usage on contention)
9 @Suppress("UNCHECKED_CAST")
10 internal class ConcurrentWeakMap<K : Any, V: Any>(
11     /**
12      * Weak reference queue is needed when a small key is mapped to a large value, and we need to promptly release a
13      * reference to the value when the key was already disposed.
14      */
15     weakRefQueue: Boolean = false
16 ) : AbstractMutableMap<K, V>() {
17     private val _size = atomic(0)
18     private val core = atomic(Core(MIN_CAPACITY))
19     private val weakRefQueue: ReferenceQueue<K>? = if (weakRefQueue) ReferenceQueue() else null
20 
21     override val size: Int
22         get() = _size.value
23 
24     private fun decrementSize() { _size.decrementAndGet() }
25 
26     override fun get(key: K): V? = core.value.getImpl(key)
27 
28     override fun put(key: K, value: V): V? {
29         var oldValue = core.value.putImpl(key, value)
30         if (oldValue === REHASH) oldValue = putSynchronized(key, value)
31         if (oldValue == null) _size.incrementAndGet()
32         return oldValue as V?
33     }
34 
35     override fun remove(key: K): V? {
36         var oldValue = core.value.putImpl(key, null)
37         if (oldValue === REHASH) oldValue = putSynchronized(key, null)
38         if (oldValue != null) _size.decrementAndGet()
39         return oldValue as V?
40     }
41 
42     @Synchronized
43     private fun putSynchronized(key: K, value: V?): V? {
44         // Note: concurrent put leaves chance that we fail to put even after rehash, we retry until successful
45         var curCore = core.value
46         while (true) {
47             val oldValue = curCore.putImpl(key, value)
48             if (oldValue !== REHASH) return oldValue as V?
49             curCore = curCore.rehash()
50             core.value = curCore
51         }
52     }
53 
54     override val keys: MutableSet<K>
55         get() = KeyValueSet { k, _ -> k }
56 
57     override val entries: MutableSet<MutableMap.MutableEntry<K, V>>
58         get() = KeyValueSet { k, v -> Entry(k, v) }
59 
60     // We don't care much about clear's efficiency
61     override fun clear() {
62         for (k in keys) remove(k)
63     }
64 
65     fun runWeakRefQueueCleaningLoopUntilInterrupted() {
66         check(weakRefQueue != null) { "Must be created with weakRefQueue = true" }
67         try {
68             while (true) {
69                 cleanWeakRef(weakRefQueue.remove() as HashedWeakRef<*>)
70             }
71         } catch (e: InterruptedException) {
72             Thread.currentThread().interrupt()
73         }
74     }
75 
76     private fun cleanWeakRef(w: HashedWeakRef<*>) {
77         core.value.cleanWeakRef(w)
78     }
79 
80     @Suppress("UNCHECKED_CAST")
81     private inner class Core(private val allocated: Int) {
82         private val shift = allocated.countLeadingZeroBits() + 1
83         private val threshold = 2 * allocated / 3 // max fill factor at 66% to ensure speedy lookups
84         private val load = atomic(0) // counts how many slots are occupied in this core
85         private val keys = atomicArrayOfNulls<HashedWeakRef<K>?>(allocated)
86         private val values = atomicArrayOfNulls<Any?>(allocated)
87 
88         private fun index(hash: Int) = (hash * MAGIC) ushr shift
89 
90         // get is always lock-free, unwraps the value that was marked by concurrent rehash
91         fun getImpl(key: K): V? {
92             var index = index(key.hashCode())
93             while (true) {
94                 val w = keys[index].value ?: return null // not found
95                 val k = w.get()
96                 if (key == k) {
97                     val value = values[index].value
98                     return (if (value is Marked) value.ref else value) as V?
99                 }
100                 if (k == null) removeCleanedAt(index) // weak ref was here, but collected
101                 if (index == 0) index = allocated
102                 index--
103             }
104         }
105 
106         private fun removeCleanedAt(index: Int) {
107             while (true) {
108                 val oldValue = values[index].value ?: return // return when already removed
109                 if (oldValue is Marked) return // cannot remove marked (rehash is working on it, will not copy)
110                 if (values[index].compareAndSet(oldValue, null)) { // removed
111                     decrementSize()
112                     return
113                 }
114             }
115         }
116 
117         // returns REHASH when rehash is needed (the value was not put)
118         fun putImpl(key: K, value: V?, weakKey0: HashedWeakRef<K>? = null): Any? {
119             var index = index(key.hashCode())
120             var loadIncremented = false
121             var weakKey: HashedWeakRef<K>? = weakKey0
122             while (true) {
123                 val w = keys[index].value
124                 if (w == null) { // slot empty => not found => try reserving slot
125                     if (value == null) return null // removing missing value, nothing to do here
126                     if (!loadIncremented) {
127                         // We must increment load before we even try to occupy a slot to avoid overfill during concurrent put
128                         load.update { n ->
129                             if (n >= threshold) return REHASH // the load is already too big -- rehash
130                             n + 1 // otherwise increment
131                         }
132                         loadIncremented = true
133                     }
134                     if (weakKey == null) weakKey = HashedWeakRef(key, weakRefQueue)
135                     if (keys[index].compareAndSet(null, weakKey)) break // slot reserved !!!
136                     continue // retry at this slot on CAS failure (somebody already reserved this slot)
137                 }
138                 val k = w.get()
139                 if (key == k) { // found already reserved slot at index
140                     if (loadIncremented) load.decrementAndGet() // undo increment, because found a slot
141                     break
142                 }
143                 if (k == null) removeCleanedAt(index) // weak ref was here, but collected
144                 if (index == 0) index = allocated
145                 index--
146             }
147             // update value
148             var oldValue: Any?
149             while (true) {
150                 oldValue = values[index].value
151                 if (oldValue is Marked) return REHASH // rehash started, cannot work here
152                 if (values[index].compareAndSet(oldValue, value)) break
153             }
154             return oldValue as V?
155         }
156 
157         // only one thread can rehash, but may have concurrent puts/gets
158         fun rehash(): Core {
159             // use size to approximate new required capacity to have at least 25-50% fill factor,
160             // may fail due to concurrent modification, will retry
161             retry@while (true) {
162                 val newCapacity = size.coerceAtLeast(MIN_CAPACITY / 4).takeHighestOneBit() * 4
163                 val newCore = Core(newCapacity)
164                 for (index in 0 until allocated) {
165                     // load the key
166                     val w = keys[index].value
167                     val k = w?.get()
168                     if (w != null && k == null) removeCleanedAt(index) // weak ref was here, but collected
169                     // mark value so that it cannot be changed while we rehash to new core
170                     var value: Any?
171                     while (true) {
172                         value = values[index].value
173                         if (value is Marked) { // already marked -- good
174                             value = value.ref
175                             break
176                         }
177                         // try mark
178                         if (values[index].compareAndSet(value, value.mark())) break
179                     }
180                     if (k != null && value != null) {
181                         val oldValue = newCore.putImpl(k, value as V, w)
182                         if (oldValue === REHASH) continue@retry // retry if we underestimated capacity
183                         assert(oldValue == null)
184                     }
185                 }
186                 return newCore // rehashed everything successfully
187             }
188         }
189 
190         fun cleanWeakRef(weakRef: HashedWeakRef<*>) {
191             var index = index(weakRef.hash)
192             while (true) {
193                 val w = keys[index].value ?: return // return when slots are over
194                 if (w === weakRef) { // found
195                     removeCleanedAt(index)
196                     return
197                 }
198                 if (index == 0) index = allocated
199                 index--
200             }
201         }
202 
203         fun <E> keyValueIterator(factory: (K, V) -> E): MutableIterator<E> = KeyValueIterator(factory)
204 
205         private inner class KeyValueIterator<E>(private val factory: (K, V) -> E) : MutableIterator<E> {
206             private var index = -1
207             private lateinit var key: K
208             private lateinit var value: V
209 
210             init { findNext() }
211 
212             private fun findNext() {
213                 while (++index < allocated) {
214                     key = keys[index].value?.get() ?: continue
215                     var value = values[index].value
216                     if (value is Marked) value = value.ref
217                     if (value != null) {
218                         this.value = value as V
219                         return
220                     }
221                 }
222             }
223 
224             override fun hasNext(): Boolean = index < allocated
225 
226             override fun next(): E {
227                 if (index >= allocated) throw NoSuchElementException()
228                 return factory(key, value).also { findNext() }
229             }
230 
231             override fun remove() = noImpl()
232         }
233     }
234 
235     private class Entry<K, V>(override val key: K, override val value: V) : MutableMap.MutableEntry<K, V> {
236         override fun setValue(newValue: V): V = noImpl()
237     }
238 
239     private inner class KeyValueSet<E>(
240         private val factory: (K, V) -> E
241     ) : AbstractMutableSet<E>() {
242         override val size: Int get() = this@ConcurrentWeakMap.size
243         override fun add(element: E): Boolean = noImpl()
244         override fun iterator(): MutableIterator<E> = core.value.keyValueIterator(factory)
245     }
246 }
247 
248 private const val MAGIC = 2654435769L.toInt() // golden ratio
249 private const val MIN_CAPACITY = 16
250 private val REHASH = Symbol("REHASH")
251 private val MARKED_NULL = Marked(null)
252 private val MARKED_TRUE = Marked(true) // When using map as set "true" used as value, optimize its mark allocation
253 
254 /**
255  * Weak reference that stores the original hash code so that we can use reference queue to promptly clean them up
256  * from the hashtable even in the absence of ongoing modifications.
257  */
258 internal class HashedWeakRef<T>(
259     ref: T, queue: ReferenceQueue<T>?
260 ) : WeakReference<T>(ref, queue) {
261     @JvmField
262     val hash = ref.hashCode()
263 }
264 
265 /**
266  * Marked values cannot be modified. The marking is performed when rehash has started to ensure that concurrent
267  * modifications (that are lock-free) cannot perform any changes and are forced to synchronize with ongoing rehash.
268  */
269 private class Marked(@JvmField val ref: Any?)
270 
Anynull271 private fun Any?.mark(): Marked = when(this) {
272     null -> MARKED_NULL
273     true -> MARKED_TRUE
274     else -> Marked(this)
275 }
276 
noImplnull277 private fun noImpl(): Nothing {
278     throw UnsupportedOperationException("not implemented")
279 }
280