xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/common/src/flow/operators/Share.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)

<lambda>null1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3 
4 package kotlinx.coroutines.flow
5 
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.channels.*
8 import kotlinx.coroutines.flow.internal.*
9 import kotlin.coroutines.*
10 import kotlin.jvm.*
11 
12 // -------------------------------- shareIn --------------------------------
13 
14 /**
15  * Converts a _cold_ [Flow] into a _hot_ [SharedFlow] that is started in the given coroutine [scope],
16  * sharing emissions from a single running instance of the upstream flow with multiple downstream subscribers,
17  * and replaying a specified number of [replay] values to new subscribers. See the [SharedFlow] documentation
18  * for the general concepts of shared flows.
19  *
20  * The starting of the sharing coroutine is controlled by the [started] parameter. The following options
21  * are supported.
22  *
23  * - [Eagerly][SharingStarted.Eagerly] &mdash; the upstream flow is started even before the first subscriber appears. Note
24  *   that in this case all values emitted by the upstream beyond the most recent values as specified by
25  *   [replay] parameter **will be immediately discarded**.
26  * - [Lazily][SharingStarted.Lazily] &mdash; starts the upstream flow after the first subscriber appears, which guarantees
27  *   that this first subscriber gets all the emitted values, while subsequent subscribers are only guaranteed to
28  *   get the most recent [replay] values. The upstream flow continues to be active even when all subscribers
29  *   disappear, but only the most recent [replay] values are cached without subscribers.
30  * - [WhileSubscribed()][SharingStarted.WhileSubscribed] &mdash; starts the upstream flow when the first subscriber
31  *   appears, immediately stops when the last subscriber disappears, keeping the replay cache forever.
32  *   It has additional optional configuration parameters as explained in its documentation.
33  * - A custom strategy can be supplied by implementing the [SharingStarted] interface.
34  *
35  * The `shareIn` operator is useful in situations when there is a _cold_ flow that is expensive to create and/or
36  * to maintain, but there are multiple subscribers that need to collect its values. For example, consider a
37  * flow of messages coming from a backend over the expensive network connection, taking a lot of
38  * time to establish. Conceptually, it might be implemented like this:
39  *
40  * ```
41  * val backendMessages: Flow<Message> = flow {
42  *     connectToBackend() // takes a lot of time
43  *     try {
44  *       while (true) {
45  *           emit(receiveMessageFromBackend())
46  *       }
47  *     } finally {
48  *         disconnectFromBackend()
49  *     }
50  * }
51  * ```
52  *
53  * If this flow is directly used in the application, then every time it is collected a fresh connection is
54  * established, and it will take a while before messages start flowing. However, we can share a single connection
55  * and establish it eagerly like this:
56  *
57  * ```
58  * val messages: SharedFlow<Message> = backendMessages.shareIn(scope, SharingStarted.Eagerly)
59  * ```
60  *
61  * Now a single connection is shared between all collectors from `messages`, and there is a chance that the connection
62  * is already established by the time it is needed.
63  *
64  * ### Upstream completion and error handling
65  *
66  * **Normal completion of the upstream flow has no effect on subscribers**, and the sharing coroutine continues to run. If a
67  * strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special
68  * action on upstream completion is needed, then an [onCompletion] operator can be used before the
69  * `shareIn` operator to emit a special value in this case, like this:
70  *
71  * ```
72  * backendMessages
73  *     .onCompletion { cause -> if (cause == null) emit(UpstreamHasCompletedMessage) }
74  *     .shareIn(scope, SharingStarted.Eagerly)
75  * ```
76  *
77  * Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers,
78  * and will be handled by the [scope] in which the sharing coroutine is launched. Custom exception handling
79  * can be configured by using the [catch] or [retry] operators before the `shareIn` operator.
80  * For example, to retry connection on any `IOException` with 1 second delay between attempts, use:
81  *
82  * ```
83  * val messages = backendMessages
84  *     .retry { e ->
85  *         val shallRetry = e is IOException // other exception are bugs - handle them
86  *         if (shallRetry) delay(1000)
87  *         shallRetry
88  *     }
89  *     .shareIn(scope, SharingStarted.Eagerly)
90  * ```
91  *
92  * ### Initial value
93  *
94  * When a special initial value is needed to signal to subscribers that the upstream is still loading the data,
95  * use the [onStart] operator on the upstream flow. For example:
96  *
97  * ```
98  * backendMessages
99  *     .onStart { emit(UpstreamIsStartingMessage) }
100  *     .shareIn(scope, SharingStarted.Eagerly, 1) // replay one most recent message
101  * ```
102  *
103  * ### Buffering and conflation
104  *
105  * The `shareIn` operator runs the upstream flow in a separate coroutine, and buffers emissions from upstream as explained
106  * in the [buffer] operator's description, using a buffer of [replay] size or the default (whichever is larger).
107  * This default buffering can be overridden with an explicit buffer configuration by preceding the `shareIn` call
108  * with [buffer] or [conflate], for example:
109  *
110  * - `buffer(0).shareIn(scope, started, 0)` &mdash; overrides the default buffer size and creates a [SharedFlow] without a buffer.
111  *   Effectively, it configures sequential processing between the upstream emitter and subscribers,
112  *   as the emitter is suspended until all subscribers process the value. Note, that the value is still immediately
113  *   discarded when there are no subscribers.
114  * - `buffer(b).shareIn(scope, started, r)` &mdash; creates a [SharedFlow] with `replay = r` and `extraBufferCapacity = b`.
115  * - `conflate().shareIn(scope, started, r)` &mdash; creates a [SharedFlow] with `replay = r`, `onBufferOverflow = DROP_OLDEST`,
116  *   and `extraBufferCapacity = 1` when `replay == 0` to support this strategy.
117  *
118  * ### Operator fusion
119  *
120  * Application of [flowOn][Flow.flowOn], [buffer] with [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
121  * or [cancellable] operators to the resulting shared flow has no effect.
122  *
123  * ### Exceptions
124  *
125  * This function throws [IllegalArgumentException] on unsupported values of parameters or combinations thereof.
126  *
127  * @param scope the coroutine scope in which sharing is started.
128  * @param started the strategy that controls when sharing is started and stopped.
129  * @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero).
130  */
131 public fun <T> Flow<T>.shareIn(
132     scope: CoroutineScope,
133     started: SharingStarted,
134     replay: Int = 0
135 ): SharedFlow<T> {
136     val config = configureSharing(replay)
137     val shared = MutableSharedFlow<T>(
138         replay = replay,
139         extraBufferCapacity = config.extraBufferCapacity,
140         onBufferOverflow = config.onBufferOverflow
141     )
142     @Suppress("UNCHECKED_CAST")
143     val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
144     return ReadonlySharedFlow(shared, job)
145 }
146 
147 private class SharingConfig<T>(
148     @JvmField val upstream: Flow<T>,
149     @JvmField val extraBufferCapacity: Int,
150     @JvmField val onBufferOverflow: BufferOverflow,
151     @JvmField val context: CoroutineContext
152 )
153 
154 // Decomposes upstream flow to fuse with it when possible
configureSharingnull155 private fun <T> Flow<T>.configureSharing(replay: Int): SharingConfig<T> {
156     assert { replay >= 0 }
157     val defaultExtraCapacity = replay.coerceAtLeast(Channel.CHANNEL_DEFAULT_CAPACITY) - replay
158     // Combine with preceding buffer/flowOn and channel-using operators
159     if (this is ChannelFlow) {
160         // Check if this ChannelFlow can operate without a channel
161         val upstream = dropChannelOperators()
162         if (upstream != null) { // Yes, it can => eliminate the intermediate channel
163             return SharingConfig(
164                 upstream = upstream,
165                 extraBufferCapacity = when (capacity) {
166                     Channel.OPTIONAL_CHANNEL, Channel.BUFFERED, 0 -> // handle special capacities
167                         when {
168                             onBufferOverflow == BufferOverflow.SUSPEND -> // buffer was configured with suspension
169                                 if (capacity == 0) 0 else defaultExtraCapacity // keep explicitly configured 0 or use default
170                             replay == 0 -> 1 // no suspension => need at least buffer of one
171                             else -> 0 // replay > 0 => no need for extra buffer beyond replay because we don't suspend
172                         }
173                     else -> capacity // otherwise just use the specified capacity as extra capacity
174                 },
175                 onBufferOverflow = onBufferOverflow,
176                 context = context
177             )
178         }
179     }
180     // Add sharing operator on top with a default buffer
181     return SharingConfig(
182         upstream = this,
183         extraBufferCapacity = defaultExtraCapacity,
184         onBufferOverflow = BufferOverflow.SUSPEND,
185         context = EmptyCoroutineContext
186     )
187 }
188 
189 // Launches sharing coroutine
launchSharingnull190 private fun <T> CoroutineScope.launchSharing(
191     context: CoroutineContext,
192     upstream: Flow<T>,
193     shared: MutableSharedFlow<T>,
194     started: SharingStarted,
195     initialValue: T
196 ): Job {
197     /*
198      * Conditional start: in the case when sharing and subscribing happens in the same dispatcher, we want to
199      * have the following invariants preserved:
200      * - Delayed sharing strategies have a chance to immediately observe consecutive subscriptions.
201      *   E.g. in the cases like `flow.shareIn(...); flow.take(1)` we want sharing strategy to see the initial subscription
202      * - Eager sharing does not start immediately, so the subscribers have actual chance to subscribe _prior_ to sharing.
203      */
204     val start = if (started == SharingStarted.Eagerly) CoroutineStart.DEFAULT else CoroutineStart.UNDISPATCHED
205     return launch(context, start = start) { // the single coroutine to rule the sharing
206         // Optimize common built-in started strategies
207         when {
208             started === SharingStarted.Eagerly -> {
209                 // collect immediately & forever
210                 upstream.collect(shared)
211             }
212             started === SharingStarted.Lazily -> {
213                 // start collecting on the first subscriber - wait for it first
214                 shared.subscriptionCount.first { it > 0 }
215                 upstream.collect(shared)
216             }
217             else -> {
218                 // other & custom strategies
219                 started.command(shared.subscriptionCount)
220                     .distinctUntilChanged() // only changes in command have effect
221                     .collectLatest { // cancels block on new emission
222                         when (it) {
223                             SharingCommand.START -> upstream.collect(shared) // can be cancelled
224                             SharingCommand.STOP -> { /* just cancel and do nothing else */ }
225                             SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> {
226                                 if (initialValue === NO_VALUE) {
227                                     shared.resetReplayCache() // regular shared flow -> reset cache
228                                 } else {
229                                     shared.tryEmit(initialValue) // state flow -> reset to initial value
230                                 }
231                             }
232                         }
233                     }
234             }
235         }
236     }
237 }
238 
239 // -------------------------------- stateIn --------------------------------
240 
241 /**
242  * Converts a _cold_ [Flow] into a _hot_ [StateFlow] that is started in the given coroutine [scope],
243  * sharing the most recently emitted value from a single running instance of the upstream flow with multiple
244  * downstream subscribers. See the [StateFlow] documentation for the general concepts of state flows.
245  *
246  * The starting of the sharing coroutine is controlled by the [started] parameter, as explained in the
247  * documentation for [shareIn] operator.
248  *
249  * The `stateIn` operator is useful in situations when there is a _cold_ flow that provides updates to the
250  * value of some state and is expensive to create and/or to maintain, but there are multiple subscribers
251  * that need to collect the most recent state value. For example, consider a
252  * flow of state updates coming from a backend over the expensive network connection, taking a lot of
253  * time to establish. Conceptually it might be implemented like this:
254  *
255  * ```
256  * val backendState: Flow<State> = flow {
257  *     connectToBackend() // takes a lot of time
258  *     try {
259  *       while (true) {
260  *           emit(receiveStateUpdateFromBackend())
261  *       }
262  *     } finally {
263  *         disconnectFromBackend()
264  *     }
265  * }
266  * ```
267  *
268  * If this flow is directly used in the application, then every time it is collected a fresh connection is
269  * established, and it will take a while before state updates start flowing. However, we can share a single connection
270  * and establish it eagerly like this:
271  *
272  * ```
273  * val state: StateFlow<State> = backendMessages.stateIn(scope, SharingStarted.Eagerly, State.LOADING)
274  * ```
275  *
276  * Now, a single connection is shared between all collectors from `state`, and there is a chance that the connection
277  * is already established by the time it is needed.
278  *
279  * ### Upstream completion and error handling
280  *
281  * **Normal completion of the upstream flow has no effect on subscribers**, and the sharing coroutine continues to run. If a
282  * a strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special
283  * action on upstream completion is needed, then an [onCompletion] operator can be used before
284  * the `stateIn` operator to emit a special value in this case. See the [shareIn] operator's documentation for an example.
285  *
286  * Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers,
287  * and will be handled by the [scope] in which the sharing coroutine is launched. Custom exception handling
288  * can be configured by using the [catch] or [retry] operators before the `stateIn` operator, similarly to
289  * the [shareIn] operator.
290  *
291  * ### Operator fusion
292  *
293  * Application of [flowOn][Flow.flowOn], [conflate][Flow.conflate],
294  * [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
295  * [distinctUntilChanged][Flow.distinctUntilChanged], or [cancellable] operators to a state flow has no effect.
296  *
297  * @param scope the coroutine scope in which sharing is started.
298  * @param started the strategy that controls when sharing is started and stopped.
299  * @param initialValue the initial value of the state flow.
300  *   This value is also used when the state flow is reset using the [SharingStarted.WhileSubscribed] strategy
301  *   with the `replayExpirationMillis` parameter.
302  */
stateInnull303 public fun <T> Flow<T>.stateIn(
304     scope: CoroutineScope,
305     started: SharingStarted,
306     initialValue: T
307 ): StateFlow<T> {
308     val config = configureSharing(1)
309     val state = MutableStateFlow(initialValue)
310     val job = scope.launchSharing(config.context, config.upstream, state, started, initialValue)
311     return ReadonlyStateFlow(state, job)
312 }
313 
314 /**
315  * Starts the upstream flow in a given [scope], suspends until the first value is emitted, and returns a _hot_
316  * [StateFlow] of future emissions, sharing the most recently emitted value from this running instance of the upstream flow
317  * with multiple downstream subscribers. See the [StateFlow] documentation for the general concepts of state flows.
318  *
319  * @param scope the coroutine scope in which sharing is started.
320  */
stateInnull321 public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T> {
322     val config = configureSharing(1)
323     val result = CompletableDeferred<StateFlow<T>>()
324     scope.launchSharingDeferred(config.context, config.upstream, result)
325     return result.await()
326 }
327 
launchSharingDeferrednull328 private fun <T> CoroutineScope.launchSharingDeferred(
329     context: CoroutineContext,
330     upstream: Flow<T>,
331     result: CompletableDeferred<StateFlow<T>>
332 ) {
333     launch(context) {
334         try {
335             var state: MutableStateFlow<T>? = null
336             upstream.collect { value ->
337                 state?.let { it.value = value } ?: run {
338                     state = MutableStateFlow(value).also {
339                         result.complete(ReadonlyStateFlow(it, coroutineContext.job))
340                     }
341                 }
342             }
343         } catch (e: Throwable) {
344             // Notify the waiter that the flow has failed
345             result.completeExceptionally(e)
346             // But still cancel the scope where state was (not) produced
347             throw e
348         }
349     }
350 }
351 
352 // -------------------------------- asSharedFlow/asStateFlow --------------------------------
353 
354 /**
355  * Represents this mutable shared flow as a read-only shared flow.
356  */
asSharedFlownull357 public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> =
358     ReadonlySharedFlow(this, null)
359 
360 /**
361  * Represents this mutable state flow as a read-only state flow.
362  */
363 public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> =
364     ReadonlyStateFlow(this, null)
365 
366 private class ReadonlySharedFlow<T>(
367     flow: SharedFlow<T>,
368     @Suppress("unused")
369     private val job: Job? // keeps a strong reference to the job (if present)
370 ) : SharedFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
371     override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
372         fuseSharedFlow(context, capacity, onBufferOverflow)
373 }
374 
375 private class ReadonlyStateFlow<T>(
376     flow: StateFlow<T>,
377     @Suppress("unused")
378     private val job: Job? // keeps a strong reference to the job (if present)
379 ) : StateFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
fusenull380     override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
381         fuseStateFlow(context, capacity, onBufferOverflow)
382 }
383 
384 // -------------------------------- onSubscription --------------------------------
385 
386 /**
387  * Returns a flow that invokes the given [action] **after** this shared flow starts to be collected
388  * (after the subscription is registered).
389  *
390  * The [action] is called before any value is emitted from the upstream
391  * flow to this subscription but after the subscription is established. It is guaranteed that all emissions to
392  * the upstream flow that happen inside or immediately after this `onSubscription` action will be
393  * collected by this subscription.
394  *
395  * The receiver of the [action] is [FlowCollector], so `onSubscription` can emit additional elements.
396  */
397 public fun <T> SharedFlow<T>.onSubscription(action: suspend FlowCollector<T>.() -> Unit): SharedFlow<T> =
398     SubscribedSharedFlow(this, action)
399 
400 private class SubscribedSharedFlow<T>(
401     private val sharedFlow: SharedFlow<T>,
402     private val action: suspend FlowCollector<T>.() -> Unit
403 ) : SharedFlow<T> by sharedFlow {
404     override suspend fun collect(collector: FlowCollector<T>) =
405         sharedFlow.collect(SubscribedFlowCollector(collector, action))
406 }
407 
408 internal class SubscribedFlowCollector<T>(
409     private val collector: FlowCollector<T>,
410     private val action: suspend FlowCollector<T>.() -> Unit
<lambda>null411 ) : FlowCollector<T> by collector {
412     suspend fun onSubscription() {
413         val safeCollector = SafeCollector(collector, currentCoroutineContext())
414         try {
415             safeCollector.action()
416         } finally {
417             safeCollector.releaseIntercepted()
418         }
419         if (collector is SubscribedFlowCollector) collector.onSubscription()
420     }
421 }
422