1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3
4 package kotlinx.coroutines.flow
5
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.flow.internal.*
8 import kotlin.jvm.*
9
10 /**
11 * Terminal flow operator that collects the given flow but ignores all emitted values.
12 * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
13 *
14 * It is a shorthand for `collect {}`.
15 *
16 * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values and
17 * handle an exception that might occur in the upstream flow or during processing, for example:
18 *
19 * ```
20 * flow
21 * .onEach { value -> process(value) }
22 * .catch { e -> handleException(e) }
23 * .collect() // trigger collection of the flow
24 * ```
25 */
collectnull26 public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
27
28 /**
29 * Terminal flow operator that [launches][launch] the [collection][collect] of the given flow in the [scope].
30 * It is a shorthand for `scope.launch { flow.collect() }`.
31 *
32 * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values
33 * handle an exception that might occur in the upstream flow or during processing, for example:
34 *
35 * ```
36 * flow
37 * .onEach { value -> updateUi(value) }
38 * .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
39 * .catch { cause -> LOG.error("Exception: $cause") }
40 * .launchIn(uiScope)
41 * ```
42 *
43 * Note that the resulting value of [launchIn] is not used and the provided scope takes care of cancellation.
44 */
45 public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
46 collect() // tail-call
47 }
48
49 /**
50 * Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element.
51 * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
52 *
53 * See also [collect] and [withIndex].
54 */
collectIndexednull55 public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend (index: Int, value: T) -> Unit): Unit =
56 collect(object : FlowCollector<T> {
57 private var index = 0
58 override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
59 })
60
61 /**
62 * Terminal flow operator that collects the given flow with a provided [action].
63 * The crucial difference from [collect] is that when the original flow emits a new value
64 * then the [action] block for the previous value is cancelled.
65 *
66 * It can be demonstrated by the following example:
67 *
68 * ```
69 * flow {
70 * emit(1)
71 * delay(50)
72 * emit(2)
73 * }.collectLatest { value ->
74 * println("Collecting $value")
75 * delay(100) // Emulate work
76 * println("$value collected")
77 * }
78 * ```
79 *
80 * prints "Collecting 1, Collecting 2, 2 collected"
81 */
collectLatestnull82 public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
83 /*
84 * Implementation note:
85 * buffer(0) is inserted here to fulfil user's expectations in sequential usages, e.g.:
86 * ```
87 * flowOf(1, 2, 3).collectLatest {
88 * delay(1)
89 * println(it) // Expect only 3 to be printed
90 * }
91 * ```
92 *
93 * It's not the case for intermediate operators which users mostly use for interactive UI,
94 * where performance of dispatch is more important.
95 */
96 mapLatest(action).buffer(0).collect()
97 }
98
99 /**
100 * Collects all the values from the given [flow] and emits them to the collector.
101 * It is a shorthand for `flow.collect { value -> emit(value) }`.
102 */
emitAllnull103 public suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) {
104 ensureActive()
105 flow.collect(this)
106 }
107
108 /** @suppress */
109 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Backwards compatibility with JS and K/N")
collectnull110 public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
111 collect(object : FlowCollector<T> {
112 override suspend fun emit(value: T) = action(value)
113 })
114