<lambda>null1 package kotlinx.coroutines.flow.internal
2 
3 import kotlinx.coroutines.channels.*
4 import kotlinx.coroutines.flow.*
5 import kotlinx.coroutines.internal.*
6 import kotlin.coroutines.*
7 import kotlin.jvm.*
8 
9 @JvmField
10 internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
11 
12 internal abstract class AbstractSharedFlowSlot<F> {
13     abstract fun allocateLocked(flow: F): Boolean
14     abstract fun freeLocked(flow: F): Array<Continuation<Unit>?> // returns continuations to resume after lock
15 }
16 
17 internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
18     protected var slots: Array<S?>? = null // allocated when needed
19         private set
20     protected var nCollectors = 0 // number of allocated (!free) slots
21         private set
22     private var nextIndex = 0 // oracle for the next free slot index
23     private var _subscriptionCount: SubscriptionCountStateFlow? = null // init on first need
24 
25     val subscriptionCount: StateFlow<Int>
<lambda>null26         get() = synchronized(this) {
27             // allocate under lock in sync with nCollectors variable
28             _subscriptionCount ?: SubscriptionCountStateFlow(nCollectors).also {
29                 _subscriptionCount = it
30             }
31         }
32 
createSlotnull33     protected abstract fun createSlot(): S
34 
35     protected abstract fun createSlotArray(size: Int): Array<S?>
36 
37     @Suppress("UNCHECKED_CAST")
38     protected fun allocateSlot(): S {
39         // Actually create slot under lock
40         val subscriptionCount: SubscriptionCountStateFlow?
41         val slot = synchronized(this) {
42             val slots = when (val curSlots = slots) {
43                 null -> createSlotArray(2).also { slots = it }
44                 else -> if (nCollectors >= curSlots.size) {
45                     curSlots.copyOf(2 * curSlots.size).also { slots = it }
46                 } else {
47                     curSlots
48                 }
49             }
50             var index = nextIndex
51             var slot: S
52             while (true) {
53                 slot = slots[index] ?: createSlot().also { slots[index] = it }
54                 index++
55                 if (index >= slots.size) index = 0
56                 if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot
57             }
58             nextIndex = index
59             nCollectors++
60             subscriptionCount = _subscriptionCount // retrieve under lock if initialized
61             slot
62         }
63         // increments subscription count
64         subscriptionCount?.increment(1)
65         return slot
66     }
67 
68     @Suppress("UNCHECKED_CAST")
freeSlotnull69     protected fun freeSlot(slot: S) {
70         // Release slot under lock
71         val subscriptionCount: SubscriptionCountStateFlow?
72         val resumes = synchronized(this) {
73             nCollectors--
74             subscriptionCount = _subscriptionCount // retrieve under lock if initialized
75             // Reset next index oracle if we have no more active collectors for more predictable behavior next time
76             if (nCollectors == 0) nextIndex = 0
77             (slot as AbstractSharedFlowSlot<Any>).freeLocked(this)
78         }
79         /*
80          * Resume suspended coroutines.
81          * This can happen when the subscriber that was freed was a slow one and was holding up buffer.
82          * When this subscriber was freed, previously queued emitted can now wake up and are resumed here.
83          */
84         for (cont in resumes) cont?.resume(Unit)
85         // decrement subscription count
86         subscriptionCount?.increment(-1)
87     }
88 
forEachSlotLockednull89     protected inline fun forEachSlotLocked(block: (S) -> Unit) {
90         if (nCollectors == 0) return
91         slots?.forEach { slot ->
92             if (slot != null) block(slot)
93         }
94     }
95 }
96 
97 /**
98  * [StateFlow] that represents the number of subscriptions.
99  *
100  * It is exposed as a regular [StateFlow] in our public API, but it is implemented as [SharedFlow] undercover to
101  * avoid conflations of consecutive updates because the subscription count is very sensitive to it.
102  *
103  * The importance of non-conflating can be demonstrated with the following example:
104  * ```
105  * val shared = flowOf(239).stateIn(this, SharingStarted.Lazily, 42) // stateIn for the sake of the initial value
106  * println(shared.first())
107  * yield()
108  * println(shared.first())
109  * ```
110  * If the flow is shared within the same dispatcher (e.g. Main) or with a slow/throttled one,
111  * the `SharingStarted.Lazily` will never be able to start the source: `first` sees the initial value and immediately
112  * unsubscribes, leaving the asynchronous `SharingStarted` with conflated zero.
113  *
114  * To avoid that (especially in a more complex scenarios), we do not conflate subscription updates.
115  */
116 private class SubscriptionCountStateFlow(initialValue: Int) : StateFlow<Int>,
117     SharedFlowImpl<Int>(1, Int.MAX_VALUE, BufferOverflow.DROP_OLDEST)
118 {
119     init { tryEmit(initialValue) }
120 
121     override val value: Int
<lambda>null122         get() = synchronized(this) { lastReplayedLocked }
123 
<lambda>null124     fun increment(delta: Int) = synchronized(this) {
125         tryEmit(lastReplayedLocked + delta)
126     }
127 }
128