<lambda>null1 package kotlinx.coroutines.flow
2
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.channels.*
5 import kotlinx.coroutines.flow.internal.*
6 import kotlinx.coroutines.internal.*
7 import kotlin.coroutines.*
8 import kotlin.jvm.*
9
10 /**
11 * A _hot_ [Flow] that shares emitted values among all its collectors in a broadcast fashion, so that all collectors
12 * get all emitted values. A shared flow is called _hot_ because its active instance exists independently of the
13 * presence of collectors. This is opposed to a regular [Flow], such as defined by the [`flow { ... }`][flow] function,
14 * which is _cold_ and is started separately for each collector.
15 *
16 * **Shared flow never completes**. A call to [Flow.collect] on a shared flow never completes normally, and
17 * neither does a coroutine started by the [Flow.launchIn] function. An active collector of a shared flow is called a _subscriber_.
18 *
19 * A subscriber of a shared flow can be cancelled. This usually happens when the scope in which the coroutine is running
20 * is cancelled. A subscriber to a shared flow is always [cancellable][Flow.cancellable], and checks for
21 * cancellation before each emission. Note that most terminal operators like [Flow.toList] would also not complete,
22 * when applied to a shared flow, but flow-truncating operators like [Flow.take] and [Flow.takeWhile] can be used on a
23 * shared flow to turn it into a completing one.
24 *
25 * A [mutable shared flow][MutableSharedFlow] is created using the [MutableSharedFlow(...)] constructor function.
26 * Its state can be updated by [emitting][MutableSharedFlow.emit] values to it and performing other operations.
27 * See the [MutableSharedFlow] documentation for details.
28 *
29 * [SharedFlow] is useful for broadcasting events that happen inside an application to subscribers that can come and go.
30 * For example, the following class encapsulates an event bus that distributes events to all subscribers
31 * in a _rendezvous_ manner, suspending until all subscribers receive emitted event:
32 *
33 * ```
34 * class EventBus {
35 * private val _events = MutableSharedFlow<Event>() // private mutable shared flow
36 * val events = _events.asSharedFlow() // publicly exposed as read-only shared flow
37 *
38 * suspend fun produceEvent(event: Event) {
39 * _events.emit(event) // suspends until all subscribers receive it
40 * }
41 * }
42 * ```
43 *
44 * As an alternative to the above usage with the `MutableSharedFlow(...)` constructor function,
45 * any _cold_ [Flow] can be converted to a shared flow using the [shareIn] operator.
46 *
47 * There is a specialized implementation of shared flow for the case where the most recent state value needs
48 * to be shared. See [StateFlow] for details.
49 *
50 * ### Replay cache and buffer
51 *
52 * A shared flow keeps a specific number of the most recent values in its _replay cache_. Every new subscriber first
53 * gets the values from the replay cache and then gets new emitted values. The maximum size of the replay cache is
54 * specified when the shared flow is created by the `replay` parameter. A snapshot of the current replay cache
55 * is available via the [replayCache] property and it can be reset with the [MutableSharedFlow.resetReplayCache] function.
56 *
57 * A replay cache also provides buffer for emissions to the shared flow, allowing slow subscribers to
58 * get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers
59 * can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved
60 * using the `extraBufferCapacity` parameter.
61 *
62 * A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using
63 * the `onBufferOverflow` parameter, which is equal to one of the entries of the [BufferOverflow] enum. When a strategy other
64 * than [SUSPENDED][BufferOverflow.SUSPEND] is configured, emissions to the shared flow never suspend.
65 *
66 * **Buffer overflow condition can happen only when there is at least one subscriber that is not ready to accept
67 * the new value.** In the absence of subscribers only the most recent `replay` values are stored and the buffer
68 * overflow behavior is never triggered and has no effect. In particular, in the absence of subscribers emitter never
69 * suspends despite [BufferOverflow.SUSPEND] option and [BufferOverflow.DROP_LATEST] option does not have effect either.
70 * Essentially, the behavior in the absence of subscribers is always similar to [BufferOverflow.DROP_OLDEST],
71 * but the buffer is just of `replay` size (without any `extraBufferCapacity`).
72 *
73 * ### Unbuffered shared flow
74 *
75 * A default implementation of a shared flow that is created with `MutableSharedFlow()` constructor function
76 * without parameters has no replay cache nor additional buffer.
77 * [emit][MutableSharedFlow.emit] call to such a shared flow suspends until all subscribers receive the emitted value
78 * and returns immediately if there are no subscribers.
79 * Thus, [tryEmit][MutableSharedFlow.tryEmit] call succeeds and returns `true` only if
80 * there are no subscribers (in which case the emitted value is immediately lost).
81 *
82 * ### SharedFlow vs BroadcastChannel
83 *
84 * Conceptually shared flow is similar to [BroadcastChannel][BroadcastChannel]
85 * and is designed to completely replace it.
86 * It has the following important differences:
87 *
88 * - `SharedFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows
89 * for faster and simpler implementation.
90 * - `SharedFlow` supports configurable replay and buffer overflow strategy.
91 * - `SharedFlow` has a clear separation into a read-only `SharedFlow` interface and a [MutableSharedFlow].
92 * - `SharedFlow` cannot be closed like `BroadcastChannel` and can never represent a failure.
93 * All errors and completion signals should be explicitly _materialized_ if needed.
94 *
95 * To migrate [BroadcastChannel] usage to [SharedFlow], start by replacing usages of the `BroadcastChannel(capacity)`
96 * constructor with `MutableSharedFlow(0, extraBufferCapacity=capacity)` (broadcast channel does not replay
97 * values to new subscribers). Replace [send][BroadcastChannel.send] and [trySend][BroadcastChannel.trySend] calls
98 * with [emit][MutableStateFlow.emit] and [tryEmit][MutableStateFlow.tryEmit], and convert subscribers' code to flow operators.
99 *
100 * ### Concurrency
101 *
102 * All methods of shared flow are **thread-safe** and can be safely invoked from concurrent coroutines without
103 * external synchronization.
104 *
105 * ### Operator fusion
106 *
107 * Application of [flowOn][Flow.flowOn], [buffer] with [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
108 * or [cancellable] operators to a shared flow has no effect.
109 *
110 * ### Implementation notes
111 *
112 * Shared flow implementation uses a lock to ensure thread-safety, but suspending collector and emitter coroutines are
113 * resumed outside of this lock to avoid deadlocks when using unconfined coroutines. Adding new subscribers
114 * has `O(1)` amortized cost, but emitting has `O(N)` cost, where `N` is the number of subscribers.
115 *
116 * ### Not stable for inheritance
117 *
118 * **The `SharedFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
119 * might be added to this interface in the future, but is stable for use.
120 * Use the `MutableSharedFlow(replay, ...)` constructor function to create an implementation.
121 */
122 public interface SharedFlow<out T> : Flow<T> {
123 /**
124 * A snapshot of the replay cache.
125 */
126 public val replayCache: List<T>
127
128 /**
129 * Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
130 * To emit values from a shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }`
131 * SAM-conversion can be used.
132 *
133 * **A shared flow never completes**. A call to [Flow.collect] or any other terminal operator
134 * on a shared flow never completes normally.
135 *
136 * It is guaranteed that, by the time the first suspension happens, [collect] has already subscribed to the
137 * [SharedFlow] and is eligible for receiving emissions. In particular, the following code will always print `1`:
138 * ```
139 * val flow = MutableSharedFlow<Int>()
140 * launch(start = CoroutineStart.UNDISPATCHED) {
141 * flow.collect { println(1) }
142 * }
143 * flow.emit(1)
144 * ```
145 *
146 * @see [Flow.collect] for implementation and inheritance details.
147 */
148 override suspend fun collect(collector: FlowCollector<T>): Nothing
149 }
150
151 /**
152 * A mutable [SharedFlow] that provides functions to [emit] values to the flow.
153 * An instance of `MutableSharedFlow` with the given configuration parameters can be created using `MutableSharedFlow(...)`
154 * constructor function.
155 *
156 * See the [SharedFlow] documentation for details on shared flows.
157 *
158 * `MutableSharedFlow` is a [SharedFlow] that also provides the abilities to [emit] a value,
159 * to [tryEmit] without suspension if possible, to track the [subscriptionCount],
160 * and to [resetReplayCache].
161 *
162 * ### Concurrency
163 *
164 * All methods of shared flow are **thread-safe** and can be safely invoked from concurrent coroutines without
165 * external synchronization.
166 *
167 * ### Not stable for inheritance
168 *
169 * **The `MutableSharedFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
170 * might be added to this interface in the future, but is stable for use.
171 * Use the `MutableSharedFlow(...)` constructor function to create an implementation.
172 */
173 public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
174 /**
175 * Emits a [value] to this shared flow, suspending on buffer overflow.
176 *
177 * This call can suspend only when the [BufferOverflow] strategy is
178 * [SUSPEND][BufferOverflow.SUSPEND] **and** there are subscribers collecting this shared flow.
179 *
180 * If there are no subscribers, the buffer is not used.
181 * Instead, the most recently emitted value is simply stored into
182 * the replay cache if one was configured, displacing the older elements there,
183 * or dropped if no replay cache was configured.
184 *
185 * See [tryEmit] for a non-suspending variant of this function.
186 *
187 * This method is **thread-safe** and can be safely invoked from concurrent coroutines without
188 * external synchronization.
189 */
emitnull190 override suspend fun emit(value: T)
191
192 /**
193 * Tries to emit a [value] to this shared flow without suspending. It returns `true` if the value was
194 * emitted successfully (see below). When this function returns `false`, it means that a call to a plain [emit]
195 * function would suspend until there is buffer space available.
196 *
197 * This call can return `false` only when the [BufferOverflow] strategy is
198 * [SUSPEND][BufferOverflow.SUSPEND] **and** there are subscribers collecting this shared flow.
199 *
200 * If there are no subscribers, the buffer is not used.
201 * Instead, the most recently emitted value is simply stored into
202 * the replay cache if one was configured, displacing the older elements there,
203 * or dropped if no replay cache was configured. In any case, `tryEmit` returns `true`.
204 *
205 * This method is **thread-safe** and can be safely invoked from concurrent coroutines without
206 * external synchronization.
207 */
208 public fun tryEmit(value: T): Boolean
209
210 /**
211 * The number of subscribers (active collectors) to this shared flow.
212 *
213 * The integer in the resulting [StateFlow] is not negative and starts with zero for a freshly created
214 * shared flow.
215 *
216 * This state can be used to react to changes in the number of subscriptions to this shared flow.
217 * For example, if you need to call `onActive` when the first subscriber appears and `onInactive`
218 * when the last one disappears, you can set it up like this:
219 *
220 * ```
221 * sharedFlow.subscriptionCount
222 * .map { count -> count > 0 } // map count into active/inactive flag
223 * .distinctUntilChanged() // only react to true<->false changes
224 * .onEach { isActive -> // configure an action
225 * if (isActive) onActive() else onInactive()
226 * }
227 * .launchIn(scope) // launch it
228 * ```
229 *
230 * Usually, [StateFlow] conflates values, but [subscriptionCount] is not conflated.
231 * This is done so that any subscribers that need to be notified when subscribers appear do
232 * reliably observe it. With conflation, if a single subscriber appeared and immediately left, those
233 * collecting [subscriptionCount] could fail to notice it due to `0` immediately conflating the
234 * subscription count.
235 */
236 public val subscriptionCount: StateFlow<Int>
237
238 /**
239 * Resets the [replayCache] of this shared flow to an empty state.
240 * New subscribers will be receiving only the values that were emitted after this call,
241 * while old subscribers will still be receiving previously buffered values.
242 * To reset a shared flow to an initial value, emit the value after this call.
243 *
244 * On a [MutableStateFlow], which always contains a single value, this function is not
245 * supported, and throws an [UnsupportedOperationException]. To reset a [MutableStateFlow]
246 * to an initial value, just update its [value][MutableStateFlow.value].
247 *
248 * This method is **thread-safe** and can be safely invoked from concurrent coroutines without
249 * external synchronization.
250 *
251 * **Note: This is an experimental api.** This function may be removed or renamed in the future.
252 */
253 @ExperimentalCoroutinesApi
254 public fun resetReplayCache()
255 }
256
257 /**
258 * Creates a [MutableSharedFlow] with the given configuration parameters.
259 *
260 * This function throws [IllegalArgumentException] on unsupported values of parameters or combinations thereof.
261 *
262 * @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero).
263 * @param extraBufferCapacity the number of values buffered in addition to `replay`.
264 * [emit][MutableSharedFlow.emit] does not suspend while there is a buffer space remaining (optional, cannot be negative, defaults to zero).
265 * @param onBufferOverflow configures an [emit][MutableSharedFlow.emit] action on buffer overflow. Optional, defaults to
266 * [suspending][BufferOverflow.SUSPEND] attempts to emit a value.
267 * Values other than [BufferOverflow.SUSPEND] are supported only when `replay > 0` or `extraBufferCapacity > 0`.
268 * **Buffer overflow can happen only when there is at least one subscriber that is not ready to accept
269 * the new value.** In the absence of subscribers only the most recent [replay] values are stored and
270 * the buffer overflow behavior is never triggered and has no effect.
271 */
272 @Suppress("FunctionName", "UNCHECKED_CAST")
273 public fun <T> MutableSharedFlow(
274 replay: Int = 0,
275 extraBufferCapacity: Int = 0,
276 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
277 ): MutableSharedFlow<T> {
278 require(replay >= 0) { "replay cannot be negative, but was $replay" }
279 require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
280 require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
281 "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
282 }
283 val bufferCapacity0 = replay + extraBufferCapacity
284 val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
285 return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
286 }
287
288 // ------------------------------------ Implementation ------------------------------------
289
290 internal class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
291 @JvmField
292 var index = -1L // current "to-be-emitted" index, -1 means the slot is free now
293
294 @JvmField
295 var cont: Continuation<Unit>? = null // collector waiting for new value
296
allocateLockednull297 override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {
298 if (index >= 0) return false // not free
299 index = flow.updateNewCollectorIndexLocked()
300 return true
301 }
302
freeLockednull303 override fun freeLocked(flow: SharedFlowImpl<*>): Array<Continuation<Unit>?> {
304 assert { index >= 0 }
305 val oldIndex = index
306 index = -1L
307 cont = null // cleanup continuation reference
308 return flow.updateCollectorIndexLocked(oldIndex)
309 }
310 }
311
312 internal open class SharedFlowImpl<T>(
313 private val replay: Int,
314 private val bufferCapacity: Int,
315 private val onBufferOverflow: BufferOverflow
316 ) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
317 /*
318 Logical structure of the buffer
319
320 buffered values
321 /-----------------------\
322 replayCache queued emitters
323 /----------\/----------------------\
324 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
325 | | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E | | | |
326 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
327 ^ ^ ^ ^
328 | | | |
329 head | head + bufferSize head + totalSize
330 | | |
331 index of the slowest | index of the fastest
332 possible collector | possible collector
333 | |
334 | replayIndex == new collector's index
335 \---------------------- /
336 range of possible minCollectorIndex
337
338 head == minOf(minCollectorIndex, replayIndex) // by definition
339 totalSize == bufferSize + queueSize // by definition
340
341 INVARIANTS:
342 minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize)
343 replayIndex <= head + bufferSize
344 */
345
346 // Stored state
347 private var buffer: Array<Any?>? = null // allocated when needed, allocated size always power of two
348 private var replayIndex = 0L // minimal index from which new collector gets values
349 private var minCollectorIndex = 0L // minimal index of active collectors, equal to replayIndex if there are none
350 private var bufferSize = 0 // number of buffered values
351 private var queueSize = 0 // number of queued emitters
352
353 // Computed state
354 private val head: Long get() = minOf(minCollectorIndex, replayIndex)
355 private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt()
356 private val totalSize: Int get() = bufferSize + queueSize
357 private val bufferEndIndex: Long get() = head + bufferSize
358 private val queueEndIndex: Long get() = head + bufferSize + queueSize
359
360 override val replayCache: List<T>
<lambda>null361 get() = synchronized(this) {
362 val replaySize = this.replaySize
363 if (replaySize == 0) return emptyList()
364 val result = ArrayList<T>(replaySize)
365 val buffer = buffer!! // must be allocated, because replaySize > 0
366 @Suppress("UNCHECKED_CAST")
367 for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T
368 result
369 }
370
371 /*
372 * A tweak for SubscriptionCountStateFlow to get the latest value.
373 */
374 @Suppress("UNCHECKED_CAST")
375 protected val lastReplayedLocked: T
376 get() = buffer!!.getBufferAt(replayIndex + replaySize - 1) as T
377
378 @Suppress("UNCHECKED_CAST")
collectnull379 override suspend fun collect(collector: FlowCollector<T>): Nothing {
380 val slot = allocateSlot()
381 try {
382 if (collector is SubscribedFlowCollector) collector.onSubscription()
383 val collectorJob = currentCoroutineContext()[Job]
384 while (true) {
385 var newValue: Any?
386 while (true) {
387 newValue = tryTakeValue(slot) // attempt no-suspend fast path first
388 if (newValue !== NO_VALUE) break
389 awaitValue(slot) // await signal that the new value is available
390 }
391 collectorJob?.ensureActive()
392 collector.emit(newValue as T)
393 }
394 } finally {
395 freeSlot(slot)
396 }
397 }
398
tryEmitnull399 override fun tryEmit(value: T): Boolean {
400 var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
401 val emitted = synchronized(this) {
402 if (tryEmitLocked(value)) {
403 resumes = findSlotsToResumeLocked(resumes)
404 true
405 } else {
406 false
407 }
408 }
409 for (cont in resumes) cont?.resume(Unit)
410 return emitted
411 }
412
emitnull413 override suspend fun emit(value: T) {
414 if (tryEmit(value)) return // fast-path
415 emitSuspend(value)
416 }
417
418 @Suppress("UNCHECKED_CAST")
tryEmitLockednull419 private fun tryEmitLocked(value: T): Boolean {
420 // Fast path without collectors -> no buffering
421 if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
422 // With collectors we'll have to buffer
423 // cannot emit now if buffer is full & blocked by slow collectors
424 if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
425 when (onBufferOverflow) {
426 BufferOverflow.SUSPEND -> return false // will suspend
427 BufferOverflow.DROP_LATEST -> return true // just drop incoming
428 BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
429 }
430 }
431 enqueueLocked(value)
432 bufferSize++ // value was added to buffer
433 // drop oldest from the buffer if it became more than bufferCapacity
434 if (bufferSize > bufferCapacity) dropOldestLocked()
435 // keep replaySize not larger that needed
436 if (replaySize > replay) { // increment replayIndex by one
437 updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
438 }
439 return true
440 }
441
tryEmitNoCollectorsLockednull442 private fun tryEmitNoCollectorsLocked(value: T): Boolean {
443 assert { nCollectors == 0 }
444 if (replay == 0) return true // no need to replay, just forget it now
445 enqueueLocked(value) // enqueue to replayCache
446 bufferSize++ // value was added to buffer
447 // drop oldest from the buffer if it became more than replay
448 if (bufferSize > replay) dropOldestLocked()
449 minCollectorIndex = head + bufferSize // a default value (max allowed)
450 return true
451 }
452
dropOldestLockednull453 private fun dropOldestLocked() {
454 buffer!!.setBufferAt(head, null)
455 bufferSize--
456 val newHead = head + 1
457 if (replayIndex < newHead) replayIndex = newHead
458 if (minCollectorIndex < newHead) correctCollectorIndexesOnDropOldest(newHead)
459 assert { head == newHead } // since head = minOf(minCollectorIndex, replayIndex) it should have updated
460 }
461
correctCollectorIndexesOnDropOldestnull462 private fun correctCollectorIndexesOnDropOldest(newHead: Long) {
463 forEachSlotLocked { slot ->
464 @Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend
465 if (slot.index >= 0 && slot.index < newHead) {
466 slot.index = newHead // force move it up (this collector was too slow and missed the value at its index)
467 }
468 }
469 minCollectorIndex = newHead
470 }
471
472 // enqueues item to buffer array, caller shall increment either bufferSize or queueSize
enqueueLockednull473 private fun enqueueLocked(item: Any?) {
474 val curSize = totalSize
475 val buffer = when (val curBuffer = buffer) {
476 null -> growBuffer(null, 0, 2)
477 else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
478 }
479 buffer.setBufferAt(head + curSize, item)
480 }
481
growBuffernull482 private fun growBuffer(curBuffer: Array<Any?>?, curSize: Int, newSize: Int): Array<Any?> {
483 check(newSize > 0) { "Buffer size overflow" }
484 val newBuffer = arrayOfNulls<Any?>(newSize).also { buffer = it }
485 if (curBuffer == null) return newBuffer
486 val head = head
487 for (i in 0 until curSize) {
488 newBuffer.setBufferAt(head + i, curBuffer.getBufferAt(head + i))
489 }
490 return newBuffer
491 }
492
emitSuspendnull493 private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
494 var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
495 val emitter = synchronized(this) lock@{
496 // recheck buffer under lock again (make sure it is really full)
497 if (tryEmitLocked(value)) {
498 cont.resume(Unit)
499 resumes = findSlotsToResumeLocked(resumes)
500 return@lock null
501 }
502 // add suspended emitter to the buffer
503 Emitter(this, head + totalSize, value, cont).also {
504 enqueueLocked(it)
505 queueSize++ // added to queue of waiting emitters
506 // synchronous shared flow might rendezvous with waiting emitter
507 if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
508 }
509 }
510 // outside of the lock: register dispose on cancellation
511 emitter?.let { cont.disposeOnCancellation(it) }
512 // outside of the lock: resume slots if needed
513 for (r in resumes) r?.resume(Unit)
514 }
515
<lambda>null516 private fun cancelEmitter(emitter: Emitter) = synchronized(this) {
517 if (emitter.index < head) return // already skipped past this index
518 val buffer = buffer!!
519 if (buffer.getBufferAt(emitter.index) !== emitter) return // already resumed
520 buffer.setBufferAt(emitter.index, NO_VALUE)
521 cleanupTailLocked()
522 }
523
updateNewCollectorIndexLockednull524 internal fun updateNewCollectorIndexLocked(): Long {
525 val index = replayIndex
526 if (index < minCollectorIndex) minCollectorIndex = index
527 return index
528 }
529
530 // Is called when a collector disappears or changes index, returns a list of continuations to resume after lock
updateCollectorIndexLockednull531 internal fun updateCollectorIndexLocked(oldIndex: Long): Array<Continuation<Unit>?> {
532 assert { oldIndex >= minCollectorIndex }
533 if (oldIndex > minCollectorIndex) return EMPTY_RESUMES // nothing changes, it was not min
534 // start computing new minimal index of active collectors
535 val head = head
536 var newMinCollectorIndex = head + bufferSize
537 // take into account a special case of sync shared flow that can go past 1st queued emitter
538 if (bufferCapacity == 0 && queueSize > 0) newMinCollectorIndex++
539 forEachSlotLocked { slot ->
540 @Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend
541 if (slot.index >= 0 && slot.index < newMinCollectorIndex) newMinCollectorIndex = slot.index
542 }
543 assert { newMinCollectorIndex >= minCollectorIndex } // can only grow
544 if (newMinCollectorIndex <= minCollectorIndex) return EMPTY_RESUMES // nothing changes
545 // Compute new buffer size if we drop items we no longer need and no emitter is resumed:
546 // We must keep all the items from newMinIndex to the end of buffer
547 var newBufferEndIndex = bufferEndIndex // var to grow when waiters are resumed
548 val maxResumeCount = if (nCollectors > 0) {
549 // If we have collectors we can resume up to maxResumeCount waiting emitters
550 // a) queueSize -> that's how many waiting emitters we have
551 // b) bufferCapacity - newBufferSize0 -> that's how many we can afford to resume to add w/o exceeding bufferCapacity
552 val newBufferSize0 = (newBufferEndIndex - newMinCollectorIndex).toInt()
553 minOf(queueSize, bufferCapacity - newBufferSize0)
554 } else {
555 // If we don't have collectors anymore we must resume all waiting emitters
556 queueSize // that's how many waiting emitters we have (at most)
557 }
558 var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
559 val newQueueEndIndex = newBufferEndIndex + queueSize
560 if (maxResumeCount > 0) { // collect emitters to resume if we have them
561 resumes = arrayOfNulls(maxResumeCount)
562 var resumeCount = 0
563 val buffer = buffer!!
564 for (curEmitterIndex in newBufferEndIndex until newQueueEndIndex) {
565 val emitter = buffer.getBufferAt(curEmitterIndex)
566 if (emitter !== NO_VALUE) {
567 emitter as Emitter // must have Emitter class
568 resumes[resumeCount++] = emitter.cont
569 buffer.setBufferAt(curEmitterIndex, NO_VALUE) // make as canceled if we moved ahead
570 buffer.setBufferAt(newBufferEndIndex, emitter.value)
571 newBufferEndIndex++
572 if (resumeCount >= maxResumeCount) break // enough resumed, done
573 }
574 }
575 }
576 // Compute new buffer size -> how many values we now actually have after resume
577 val newBufferSize1 = (newBufferEndIndex - head).toInt()
578 // Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity,
579 // and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by
580 // forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be
581 // too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1)
582 // expression, which coerces values that are too big anyway.
583 if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex
584 // Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow
585 var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1))
586 // adjustment for synchronous case with cancelled emitter (NO_VALUE)
587 if (bufferCapacity == 0 && newReplayIndex < newQueueEndIndex && buffer!!.getBufferAt(newReplayIndex) == NO_VALUE) {
588 newBufferEndIndex++
589 newReplayIndex++
590 }
591 // Update buffer state
592 updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex)
593 // just in case we've moved all buffered emitters and have NO_VALUE's at the tail now
594 cleanupTailLocked()
595 // We need to waken up suspended collectors if any emitters were resumed here
596 if (resumes.isNotEmpty()) resumes = findSlotsToResumeLocked(resumes)
597 return resumes
598 }
599
updateBufferLockednull600 private fun updateBufferLocked(
601 newReplayIndex: Long,
602 newMinCollectorIndex: Long,
603 newBufferEndIndex: Long,
604 newQueueEndIndex: Long
605 ) {
606 // Compute new head value
607 val newHead = minOf(newMinCollectorIndex, newReplayIndex)
608 assert { newHead >= head }
609 // cleanup items we don't have to buffer anymore (because head is about to move)
610 for (index in head until newHead) buffer!!.setBufferAt(index, null)
611 // update all state variables to newly computed values
612 replayIndex = newReplayIndex
613 minCollectorIndex = newMinCollectorIndex
614 bufferSize = (newBufferEndIndex - newHead).toInt()
615 queueSize = (newQueueEndIndex - newBufferEndIndex).toInt()
616 // check our key invariants (just in case)
617 assert { bufferSize >= 0 }
618 assert { queueSize >= 0 }
619 assert { replayIndex <= this.head + bufferSize }
620 }
621
622 // Removes all the NO_VALUE items from the end of the queue and reduces its size
cleanupTailLockednull623 private fun cleanupTailLocked() {
624 // If we have synchronous case, then keep one emitter queued
625 if (bufferCapacity == 0 && queueSize <= 1) return // return, don't clear it
626 val buffer = buffer!!
627 while (queueSize > 0 && buffer.getBufferAt(head + totalSize - 1) === NO_VALUE) {
628 queueSize--
629 buffer.setBufferAt(head + totalSize, null)
630 }
631 }
632
633 // returns NO_VALUE if cannot take value without suspension
tryTakeValuenull634 private fun tryTakeValue(slot: SharedFlowSlot): Any? {
635 var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
636 val value = synchronized(this) {
637 val index = tryPeekLocked(slot)
638 if (index < 0) {
639 NO_VALUE
640 } else {
641 val oldIndex = slot.index
642 val newValue = getPeekedValueLockedAt(index)
643 slot.index = index + 1 // points to the next index after peeked one
644 resumes = updateCollectorIndexLocked(oldIndex)
645 newValue
646 }
647 }
648 for (resume in resumes) resume?.resume(Unit)
649 return value
650 }
651
652 // returns -1 if cannot peek value without suspension
tryPeekLockednull653 private fun tryPeekLocked(slot: SharedFlowSlot): Long {
654 // return buffered value if possible
655 val index = slot.index
656 if (index < bufferEndIndex) return index
657 if (bufferCapacity > 0) return -1L // if there's a buffer, never try to rendezvous with emitters
658 // Synchronous shared flow (bufferCapacity == 0) tries to rendezvous
659 if (index > head) return -1L // ... but only with the first emitter (never look forward)
660 if (queueSize == 0) return -1L // nothing there to rendezvous with
661 return index // rendezvous with the first emitter
662 }
663
getPeekedValueLockedAtnull664 private fun getPeekedValueLockedAt(index: Long): Any? =
665 when (val item = buffer!!.getBufferAt(index)) {
666 is Emitter -> item.value
667 else -> item
668 }
669
awaitValuenull670 private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
671 synchronized(this) lock@{
672 val index = tryPeekLocked(slot) // recheck under this lock
673 if (index < 0) {
674 slot.cont = cont // Ok -- suspending
675 } else {
676 cont.resume(Unit) // has value, no need to suspend
677 return@lock
678 }
679 slot.cont = cont // suspend, waiting
680 }
681 }
682
findSlotsToResumeLockednull683 private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
684 var resumes: Array<Continuation<Unit>?> = resumesIn
685 var resumeCount = resumesIn.size
686 forEachSlotLocked loop@{ slot ->
687 val cont = slot.cont ?: return@loop // only waiting slots
688 if (tryPeekLocked(slot) < 0) return@loop // only slots that can peek a value
689 if (resumeCount >= resumes.size) resumes = resumes.copyOf(maxOf(2, 2 * resumes.size))
690 resumes[resumeCount++] = cont
691 slot.cont = null // not waiting anymore
692 }
693 return resumes
694 }
695
createSlotnull696 override fun createSlot() = SharedFlowSlot()
697 override fun createSlotArray(size: Int): Array<SharedFlowSlot?> = arrayOfNulls(size)
698
699 override fun resetReplayCache() = synchronized(this) {
700 // Update buffer state
701 updateBufferLocked(
702 newReplayIndex = bufferEndIndex,
703 newMinCollectorIndex = minCollectorIndex,
704 newBufferEndIndex = bufferEndIndex,
705 newQueueEndIndex = queueEndIndex
706 )
707 }
708
fusenull709 override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
710 fuseSharedFlow(context, capacity, onBufferOverflow)
711
712 private class Emitter(
713 @JvmField val flow: SharedFlowImpl<*>,
714 @JvmField var index: Long,
715 @JvmField val value: Any?,
716 @JvmField val cont: Continuation<Unit>
717 ) : DisposableHandle {
718 override fun dispose() = flow.cancelEmitter(this)
719 }
720 }
721
722 @JvmField
723 internal val NO_VALUE = Symbol("NO_VALUE")
724
Arraynull725 private fun Array<Any?>.getBufferAt(index: Long) = get(index.toInt() and (size - 1))
726 private fun Array<Any?>.setBufferAt(index: Long, item: Any?) = set(index.toInt() and (size - 1), item)
727
728 internal fun <T> SharedFlow<T>.fuseSharedFlow(
729 context: CoroutineContext,
730 capacity: Int,
731 onBufferOverflow: BufferOverflow
732 ): Flow<T> {
733 // context is irrelevant for shared flow and making additional rendezvous is meaningless
734 // however, additional non-trivial buffering after shared flow could make sense for very slow subscribers
735 if ((capacity == Channel.RENDEZVOUS || capacity == Channel.OPTIONAL_CHANNEL) && onBufferOverflow == BufferOverflow.SUSPEND) {
736 return this
737 }
738 // Apply channel flow operator as usual
739 return ChannelFlowOperatorImpl(this, context, capacity, onBufferOverflow)
740 }
741