<lambda>null1 package kotlinx.coroutines.debug.internal
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.internal.*
5 import java.lang.ref.*
6
7 // This is very limited implementation, not suitable as a generic map replacement.
8 // It has lock-free get and put with synchronized rehash for simplicity (and better CPU usage on contention)
9 @Suppress("UNCHECKED_CAST")
10 internal class ConcurrentWeakMap<K : Any, V: Any>(
11 /**
12 * Weak reference queue is needed when a small key is mapped to a large value, and we need to promptly release a
13 * reference to the value when the key was already disposed.
14 */
15 weakRefQueue: Boolean = false
16 ) : AbstractMutableMap<K, V>() {
17 private val _size = atomic(0)
18 private val core = atomic(Core(MIN_CAPACITY))
19 private val weakRefQueue: ReferenceQueue<K>? = if (weakRefQueue) ReferenceQueue() else null
20
21 override val size: Int
22 get() = _size.value
23
24 private fun decrementSize() { _size.decrementAndGet() }
25
26 override fun get(key: K): V? = core.value.getImpl(key)
27
28 override fun put(key: K, value: V): V? {
29 var oldValue = core.value.putImpl(key, value)
30 if (oldValue === REHASH) oldValue = putSynchronized(key, value)
31 if (oldValue == null) _size.incrementAndGet()
32 return oldValue as V?
33 }
34
35 override fun remove(key: K): V? {
36 var oldValue = core.value.putImpl(key, null)
37 if (oldValue === REHASH) oldValue = putSynchronized(key, null)
38 if (oldValue != null) _size.decrementAndGet()
39 return oldValue as V?
40 }
41
42 @Synchronized
43 private fun putSynchronized(key: K, value: V?): V? {
44 // Note: concurrent put leaves chance that we fail to put even after rehash, we retry until successful
45 var curCore = core.value
46 while (true) {
47 val oldValue = curCore.putImpl(key, value)
48 if (oldValue !== REHASH) return oldValue as V?
49 curCore = curCore.rehash()
50 core.value = curCore
51 }
52 }
53
54 override val keys: MutableSet<K>
55 get() = KeyValueSet { k, _ -> k }
56
57 override val entries: MutableSet<MutableMap.MutableEntry<K, V>>
58 get() = KeyValueSet { k, v -> Entry(k, v) }
59
60 // We don't care much about clear's efficiency
61 override fun clear() {
62 for (k in keys) remove(k)
63 }
64
65 fun runWeakRefQueueCleaningLoopUntilInterrupted() {
66 check(weakRefQueue != null) { "Must be created with weakRefQueue = true" }
67 try {
68 while (true) {
69 cleanWeakRef(weakRefQueue.remove() as HashedWeakRef<*>)
70 }
71 } catch (e: InterruptedException) {
72 Thread.currentThread().interrupt()
73 }
74 }
75
76 private fun cleanWeakRef(w: HashedWeakRef<*>) {
77 core.value.cleanWeakRef(w)
78 }
79
80 @Suppress("UNCHECKED_CAST")
81 private inner class Core(private val allocated: Int) {
82 private val shift = allocated.countLeadingZeroBits() + 1
83 private val threshold = 2 * allocated / 3 // max fill factor at 66% to ensure speedy lookups
84 private val load = atomic(0) // counts how many slots are occupied in this core
85 private val keys = atomicArrayOfNulls<HashedWeakRef<K>?>(allocated)
86 private val values = atomicArrayOfNulls<Any?>(allocated)
87
88 private fun index(hash: Int) = (hash * MAGIC) ushr shift
89
90 // get is always lock-free, unwraps the value that was marked by concurrent rehash
91 fun getImpl(key: K): V? {
92 var index = index(key.hashCode())
93 while (true) {
94 val w = keys[index].value ?: return null // not found
95 val k = w.get()
96 if (key == k) {
97 val value = values[index].value
98 return (if (value is Marked) value.ref else value) as V?
99 }
100 if (k == null) removeCleanedAt(index) // weak ref was here, but collected
101 if (index == 0) index = allocated
102 index--
103 }
104 }
105
106 private fun removeCleanedAt(index: Int) {
107 while (true) {
108 val oldValue = values[index].value ?: return // return when already removed
109 if (oldValue is Marked) return // cannot remove marked (rehash is working on it, will not copy)
110 if (values[index].compareAndSet(oldValue, null)) { // removed
111 decrementSize()
112 return
113 }
114 }
115 }
116
117 // returns REHASH when rehash is needed (the value was not put)
118 fun putImpl(key: K, value: V?, weakKey0: HashedWeakRef<K>? = null): Any? {
119 var index = index(key.hashCode())
120 var loadIncremented = false
121 var weakKey: HashedWeakRef<K>? = weakKey0
122 while (true) {
123 val w = keys[index].value
124 if (w == null) { // slot empty => not found => try reserving slot
125 if (value == null) return null // removing missing value, nothing to do here
126 if (!loadIncremented) {
127 // We must increment load before we even try to occupy a slot to avoid overfill during concurrent put
128 load.update { n ->
129 if (n >= threshold) return REHASH // the load is already too big -- rehash
130 n + 1 // otherwise increment
131 }
132 loadIncremented = true
133 }
134 if (weakKey == null) weakKey = HashedWeakRef(key, weakRefQueue)
135 if (keys[index].compareAndSet(null, weakKey)) break // slot reserved !!!
136 continue // retry at this slot on CAS failure (somebody already reserved this slot)
137 }
138 val k = w.get()
139 if (key == k) { // found already reserved slot at index
140 if (loadIncremented) load.decrementAndGet() // undo increment, because found a slot
141 break
142 }
143 if (k == null) removeCleanedAt(index) // weak ref was here, but collected
144 if (index == 0) index = allocated
145 index--
146 }
147 // update value
148 var oldValue: Any?
149 while (true) {
150 oldValue = values[index].value
151 if (oldValue is Marked) return REHASH // rehash started, cannot work here
152 if (values[index].compareAndSet(oldValue, value)) break
153 }
154 return oldValue as V?
155 }
156
157 // only one thread can rehash, but may have concurrent puts/gets
158 fun rehash(): Core {
159 // use size to approximate new required capacity to have at least 25-50% fill factor,
160 // may fail due to concurrent modification, will retry
161 retry@while (true) {
162 val newCapacity = size.coerceAtLeast(MIN_CAPACITY / 4).takeHighestOneBit() * 4
163 val newCore = Core(newCapacity)
164 for (index in 0 until allocated) {
165 // load the key
166 val w = keys[index].value
167 val k = w?.get()
168 if (w != null && k == null) removeCleanedAt(index) // weak ref was here, but collected
169 // mark value so that it cannot be changed while we rehash to new core
170 var value: Any?
171 while (true) {
172 value = values[index].value
173 if (value is Marked) { // already marked -- good
174 value = value.ref
175 break
176 }
177 // try mark
178 if (values[index].compareAndSet(value, value.mark())) break
179 }
180 if (k != null && value != null) {
181 val oldValue = newCore.putImpl(k, value as V, w)
182 if (oldValue === REHASH) continue@retry // retry if we underestimated capacity
183 assert(oldValue == null)
184 }
185 }
186 return newCore // rehashed everything successfully
187 }
188 }
189
190 fun cleanWeakRef(weakRef: HashedWeakRef<*>) {
191 var index = index(weakRef.hash)
192 while (true) {
193 val w = keys[index].value ?: return // return when slots are over
194 if (w === weakRef) { // found
195 removeCleanedAt(index)
196 return
197 }
198 if (index == 0) index = allocated
199 index--
200 }
201 }
202
203 fun <E> keyValueIterator(factory: (K, V) -> E): MutableIterator<E> = KeyValueIterator(factory)
204
205 private inner class KeyValueIterator<E>(private val factory: (K, V) -> E) : MutableIterator<E> {
206 private var index = -1
207 private lateinit var key: K
208 private lateinit var value: V
209
210 init { findNext() }
211
212 private fun findNext() {
213 while (++index < allocated) {
214 key = keys[index].value?.get() ?: continue
215 var value = values[index].value
216 if (value is Marked) value = value.ref
217 if (value != null) {
218 this.value = value as V
219 return
220 }
221 }
222 }
223
224 override fun hasNext(): Boolean = index < allocated
225
226 override fun next(): E {
227 if (index >= allocated) throw NoSuchElementException()
228 return factory(key, value).also { findNext() }
229 }
230
231 override fun remove() = noImpl()
232 }
233 }
234
235 private class Entry<K, V>(override val key: K, override val value: V) : MutableMap.MutableEntry<K, V> {
236 override fun setValue(newValue: V): V = noImpl()
237 }
238
239 private inner class KeyValueSet<E>(
240 private val factory: (K, V) -> E
241 ) : AbstractMutableSet<E>() {
242 override val size: Int get() = this@ConcurrentWeakMap.size
243 override fun add(element: E): Boolean = noImpl()
244 override fun iterator(): MutableIterator<E> = core.value.keyValueIterator(factory)
245 }
246 }
247
248 private const val MAGIC = 2654435769L.toInt() // golden ratio
249 private const val MIN_CAPACITY = 16
250 private val REHASH = Symbol("REHASH")
251 private val MARKED_NULL = Marked(null)
252 private val MARKED_TRUE = Marked(true) // When using map as set "true" used as value, optimize its mark allocation
253
254 /**
255 * Weak reference that stores the original hash code so that we can use reference queue to promptly clean them up
256 * from the hashtable even in the absence of ongoing modifications.
257 */
258 internal class HashedWeakRef<T>(
259 ref: T, queue: ReferenceQueue<T>?
260 ) : WeakReference<T>(ref, queue) {
261 @JvmField
262 val hash = ref.hashCode()
263 }
264
265 /**
266 * Marked values cannot be modified. The marking is performed when rehash has started to ensure that concurrent
267 * modifications (that are lock-free) cannot perform any changes and are forced to synchronize with ongoing rehash.
268 */
269 private class Marked(@JvmField val ref: Any?)
270
Anynull271 private fun Any?.mark(): Marked = when(this) {
272 null -> MARKED_NULL
273 true -> MARKED_TRUE
274 else -> Marked(this)
275 }
276
noImplnull277 private fun noImpl(): Nothing {
278 throw UnsupportedOperationException("not implemented")
279 }
280