<lambda>null1 package kotlinx.coroutines.flow.internal
2
3 import kotlinx.coroutines.channels.*
4 import kotlinx.coroutines.flow.*
5 import kotlinx.coroutines.internal.*
6 import kotlin.coroutines.*
7 import kotlin.jvm.*
8
9 @JvmField
10 internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
11
12 internal abstract class AbstractSharedFlowSlot<F> {
13 abstract fun allocateLocked(flow: F): Boolean
14 abstract fun freeLocked(flow: F): Array<Continuation<Unit>?> // returns continuations to resume after lock
15 }
16
17 internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
18 protected var slots: Array<S?>? = null // allocated when needed
19 private set
20 protected var nCollectors = 0 // number of allocated (!free) slots
21 private set
22 private var nextIndex = 0 // oracle for the next free slot index
23 private var _subscriptionCount: SubscriptionCountStateFlow? = null // init on first need
24
25 val subscriptionCount: StateFlow<Int>
<lambda>null26 get() = synchronized(this) {
27 // allocate under lock in sync with nCollectors variable
28 _subscriptionCount ?: SubscriptionCountStateFlow(nCollectors).also {
29 _subscriptionCount = it
30 }
31 }
32
createSlotnull33 protected abstract fun createSlot(): S
34
35 protected abstract fun createSlotArray(size: Int): Array<S?>
36
37 @Suppress("UNCHECKED_CAST")
38 protected fun allocateSlot(): S {
39 // Actually create slot under lock
40 val subscriptionCount: SubscriptionCountStateFlow?
41 val slot = synchronized(this) {
42 val slots = when (val curSlots = slots) {
43 null -> createSlotArray(2).also { slots = it }
44 else -> if (nCollectors >= curSlots.size) {
45 curSlots.copyOf(2 * curSlots.size).also { slots = it }
46 } else {
47 curSlots
48 }
49 }
50 var index = nextIndex
51 var slot: S
52 while (true) {
53 slot = slots[index] ?: createSlot().also { slots[index] = it }
54 index++
55 if (index >= slots.size) index = 0
56 if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot
57 }
58 nextIndex = index
59 nCollectors++
60 subscriptionCount = _subscriptionCount // retrieve under lock if initialized
61 slot
62 }
63 // increments subscription count
64 subscriptionCount?.increment(1)
65 return slot
66 }
67
68 @Suppress("UNCHECKED_CAST")
freeSlotnull69 protected fun freeSlot(slot: S) {
70 // Release slot under lock
71 val subscriptionCount: SubscriptionCountStateFlow?
72 val resumes = synchronized(this) {
73 nCollectors--
74 subscriptionCount = _subscriptionCount // retrieve under lock if initialized
75 // Reset next index oracle if we have no more active collectors for more predictable behavior next time
76 if (nCollectors == 0) nextIndex = 0
77 (slot as AbstractSharedFlowSlot<Any>).freeLocked(this)
78 }
79 /*
80 * Resume suspended coroutines.
81 * This can happen when the subscriber that was freed was a slow one and was holding up buffer.
82 * When this subscriber was freed, previously queued emitted can now wake up and are resumed here.
83 */
84 for (cont in resumes) cont?.resume(Unit)
85 // decrement subscription count
86 subscriptionCount?.increment(-1)
87 }
88
forEachSlotLockednull89 protected inline fun forEachSlotLocked(block: (S) -> Unit) {
90 if (nCollectors == 0) return
91 slots?.forEach { slot ->
92 if (slot != null) block(slot)
93 }
94 }
95 }
96
97 /**
98 * [StateFlow] that represents the number of subscriptions.
99 *
100 * It is exposed as a regular [StateFlow] in our public API, but it is implemented as [SharedFlow] undercover to
101 * avoid conflations of consecutive updates because the subscription count is very sensitive to it.
102 *
103 * The importance of non-conflating can be demonstrated with the following example:
104 * ```
105 * val shared = flowOf(239).stateIn(this, SharingStarted.Lazily, 42) // stateIn for the sake of the initial value
106 * println(shared.first())
107 * yield()
108 * println(shared.first())
109 * ```
110 * If the flow is shared within the same dispatcher (e.g. Main) or with a slow/throttled one,
111 * the `SharingStarted.Lazily` will never be able to start the source: `first` sees the initial value and immediately
112 * unsubscribes, leaving the asynchronous `SharingStarted` with conflated zero.
113 *
114 * To avoid that (especially in a more complex scenarios), we do not conflate subscription updates.
115 */
116 private class SubscriptionCountStateFlow(initialValue: Int) : StateFlow<Int>,
117 SharedFlowImpl<Int>(1, Int.MAX_VALUE, BufferOverflow.DROP_OLDEST)
118 {
119 init { tryEmit(initialValue) }
120
121 override val value: Int
<lambda>null122 get() = synchronized(this) { lastReplayedLocked }
123
<lambda>null124 fun increment(delta: Int) = synchronized(this) {
125 tryEmit(lastReplayedLocked + delta)
126 }
127 }
128