xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)
1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3 
4 package kotlinx.coroutines.flow
5 
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.flow.internal.*
8 import kotlin.jvm.*
9 
10 /**
11  * Terminal flow operator that collects the given flow but ignores all emitted values.
12  * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
13  *
14  * It is a shorthand for `collect {}`.
15  *
16  * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values and
17  * handle an exception that might occur in the upstream flow or during processing, for example:
18  *
19  * ```
20  * flow
21  *     .onEach { value -> process(value) }
22  *     .catch { e -> handleException(e) }
23  *     .collect() // trigger collection of the flow
24  * ```
25  */
collectnull26 public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
27 
28 /**
29  * Terminal flow operator that [launches][launch] the [collection][collect] of the given flow in the [scope].
30  * It is a shorthand for `scope.launch { flow.collect() }`.
31  *
32  * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values
33  * handle an exception that might occur in the upstream flow or during processing, for example:
34  *
35  * ```
36  * flow
37  *     .onEach { value -> updateUi(value) }
38  *     .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
39  *     .catch { cause -> LOG.error("Exception: $cause") }
40  *     .launchIn(uiScope)
41  * ```
42  *
43  * Note that the resulting value of [launchIn] is not used and the provided scope takes care of cancellation.
44  */
45 public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
46     collect() // tail-call
47 }
48 
49 /**
50  * Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element.
51  * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
52  *
53  * See also [collect] and [withIndex].
54  */
collectIndexednull55 public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend (index: Int, value: T) -> Unit): Unit =
56     collect(object : FlowCollector<T> {
57         private var index = 0
58         override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
59     })
60 
61 /**
62  * Terminal flow operator that collects the given flow with a provided [action].
63  * The crucial difference from [collect] is that when the original flow emits a new value
64  * then the [action] block for the previous value is cancelled.
65  *
66  * It can be demonstrated by the following example:
67  *
68  * ```
69  * flow {
70  *     emit(1)
71  *     delay(50)
72  *     emit(2)
73  * }.collectLatest { value ->
74  *     println("Collecting $value")
75  *     delay(100) // Emulate work
76  *     println("$value collected")
77  * }
78  * ```
79  *
80  * prints "Collecting 1, Collecting 2, 2 collected"
81  */
collectLatestnull82 public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
83     /*
84      * Implementation note:
85      * buffer(0) is inserted here to fulfil user's expectations in sequential usages, e.g.:
86      * ```
87      * flowOf(1, 2, 3).collectLatest {
88      *     delay(1)
89      *     println(it) // Expect only 3 to be printed
90      * }
91      * ```
92      *
93      * It's not the case for intermediate operators which users mostly use for interactive UI,
94      * where performance of dispatch is more important.
95      */
96     mapLatest(action).buffer(0).collect()
97 }
98 
99 /**
100  * Collects all the values from the given [flow] and emits them to the collector.
101  * It is a shorthand for `flow.collect { value -> emit(value) }`.
102  */
emitAllnull103 public suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) {
104     ensureActive()
105     flow.collect(this)
106 }
107 
108 /** @suppress */
109 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Backwards compatibility with JS and K/N")
collectnull110 public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
111     collect(object : FlowCollector<T> {
112         override suspend fun emit(value: T) = action(value)
113     })
114