xref: /aosp_15_r20/external/kotlinx.coroutines/docs/topics/flow.md (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)
1<!--- TEST_NAME FlowGuideTest -->
2
3[//]: # (title: Asynchronous Flow)
4
5A suspending function asynchronously returns a single value, but how can we return
6multiple asynchronously computed values? This is where Kotlin Flows come in.
7
8## Representing multiple values
9
10Multiple values can be represented in Kotlin using [collections].
11For example, we can have a `simple` function that returns a [List]
12of three numbers and then print them all using [forEach]:
13
14```kotlin
15fun simple(): List<Int> = listOf(1, 2, 3)
16
17fun main() {
18    simple().forEach { value -> println(value) }
19}
20```
21{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
22
23> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt).
24>
25{type="note"}
26
27This code outputs:
28
29```text
301
312
323
33```
34
35<!--- TEST -->
36
37### Sequences
38
39If we are computing the numbers with some CPU-consuming blocking code
40(each computation taking 100ms), then we can represent the numbers using a [Sequence]:
41
42```kotlin
43fun simple(): Sequence<Int> = sequence { // sequence builder
44    for (i in 1..3) {
45        Thread.sleep(100) // pretend we are computing it
46        yield(i) // yield next value
47    }
48}
49
50fun main() {
51    simple().forEach { value -> println(value) }
52}
53```
54{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
55
56> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt).
57>
58{type="note"}
59
60This code outputs the same numbers, but it waits 100ms before printing each one.
61
62<!--- TEST
631
642
653
66-->
67
68### Suspending functions
69
70However, this computation blocks the main thread that is running the code.
71When these values are computed by asynchronous code we can mark the `simple` function with a `suspend` modifier,
72so that it can perform its work without blocking and return the result as a list:
73
74```kotlin
75import kotlinx.coroutines.*
76
77//sampleStart
78suspend fun simple(): List<Int> {
79    delay(1000) // pretend we are doing something asynchronous here
80    return listOf(1, 2, 3)
81}
82
83fun main() = runBlocking<Unit> {
84    simple().forEach { value -> println(value) }
85}
86//sampleEnd
87```
88{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
89
90> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt).
91>
92{type="note"}
93
94This code prints the numbers after waiting for a second.
95
96<!--- TEST
971
982
993
100-->
101
102### Flows
103
104Using the `List<Int>` result type, means we can only return all the values at once. To represent
105the stream of values that are being computed asynchronously, we can use a [`Flow<Int>`][Flow] type just like we would use a `Sequence<Int>` type for synchronously computed values:
106
107```kotlin
108import kotlinx.coroutines.*
109import kotlinx.coroutines.flow.*
110
111//sampleStart
112fun simple(): Flow<Int> = flow { // flow builder
113    for (i in 1..3) {
114        delay(100) // pretend we are doing something useful here
115        emit(i) // emit next value
116    }
117}
118
119fun main() = runBlocking<Unit> {
120    // Launch a concurrent coroutine to check if the main thread is blocked
121    launch {
122        for (k in 1..3) {
123            println("I'm not blocked $k")
124            delay(100)
125        }
126    }
127    // Collect the flow
128    simple().collect { value -> println(value) }
129}
130//sampleEnd
131```
132{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
133
134> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt).
135>
136{type="note"}
137
138This code waits 100ms before printing each number without blocking the main thread. This is verified
139by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread:
140
141```text
142I'm not blocked 1
1431
144I'm not blocked 2
1452
146I'm not blocked 3
1473
148```
149
150<!--- TEST -->
151
152Notice the following differences in the code with the [Flow] from the earlier examples:
153
154* A builder function of [Flow] type is called [flow][_flow].
155* Code inside a `flow { ... }` builder block can suspend.
156* The `simple` function is no longer marked with a `suspend` modifier.
157* Values are _emitted_ from the flow using an [emit][FlowCollector.emit] function.
158* Values are _collected_ from the flow using a [collect][collect] function.
159
160> We can replace [delay] with `Thread.sleep` in the body of `simple`'s `flow { ... }` and see that the main
161> thread is blocked in this case.
162>
163{type="note"}
164
165## Flows are cold
166
167Flows are _cold_ streams similar to sequences &mdash; the code inside a [flow][_flow] builder does not
168run until the flow is collected. This becomes clear in the following example:
169
170```kotlin
171import kotlinx.coroutines.*
172import kotlinx.coroutines.flow.*
173
174//sampleStart
175fun simple(): Flow<Int> = flow {
176    println("Flow started")
177    for (i in 1..3) {
178        delay(100)
179        emit(i)
180    }
181}
182
183fun main() = runBlocking<Unit> {
184    println("Calling simple function...")
185    val flow = simple()
186    println("Calling collect...")
187    flow.collect { value -> println(value) }
188    println("Calling collect again...")
189    flow.collect { value -> println(value) }
190}
191//sampleEnd
192```
193{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
194
195> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt).
196>
197{type="note"}
198
199Which prints:
200
201```text
202Calling simple function...
203Calling collect...
204Flow started
2051
2062
2073
208Calling collect again...
209Flow started
2101
2112
2123
213```
214
215<!--- TEST -->
216
217This is a key reason the `simple` function (which returns a flow) is not marked with `suspend` modifier.
218The `simple()` call itself returns quickly and does not wait for anything. The flow starts afresh every time it is
219collected and that is why we see "Flow started" every time we call `collect` again.
220
221## Flow cancellation basics
222
223Flows adhere to the general cooperative cancellation of coroutines. As usual, flow collection can be
224cancelled when the flow is suspended in a cancellable suspending function (like [delay]).
225The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block
226and stops executing its code:
227
228```kotlin
229import kotlinx.coroutines.*
230import kotlinx.coroutines.flow.*
231
232//sampleStart
233fun simple(): Flow<Int> = flow {
234    for (i in 1..3) {
235        delay(100)
236        println("Emitting $i")
237        emit(i)
238    }
239}
240
241fun main() = runBlocking<Unit> {
242    withTimeoutOrNull(250) { // Timeout after 250ms
243        simple().collect { value -> println(value) }
244    }
245    println("Done")
246}
247//sampleEnd
248```
249{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
250
251> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt).
252>
253{type="note"}
254
255Notice how only two numbers get emitted by the flow in the `simple` function, producing the following output:
256
257```text
258Emitting 1
2591
260Emitting 2
2612
262Done
263```
264
265<!--- TEST -->
266
267See [Flow cancellation checks](#flow-cancellation-checks) section for more details.
268
269## Flow builders
270
271The `flow { ... }` builder from the previous examples is the most basic one. There are other builders
272that allow flows to be declared:
273
274* The [flowOf] builder defines a flow that emits a fixed set of values.
275* Various collections and sequences can be converted to flows using the `.asFlow()` extension function.
276
277For example, the snippet that prints the numbers 1 to 3 from a flow can be rewritten as follows:
278
279```kotlin
280import kotlinx.coroutines.*
281import kotlinx.coroutines.flow.*
282
283fun main() = runBlocking<Unit> {
284//sampleStart
285    // Convert an integer range to a flow
286    (1..3).asFlow().collect { value -> println(value) }
287//sampleEnd
288}
289```
290{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
291
292> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt).
293>
294{type="note"}
295
296<!--- TEST
2971
2982
2993
300-->
301
302## Intermediate flow operators
303
304Flows can be transformed using operators, in the same way as you would transform collections and
305sequences.
306Intermediate operators are applied to an upstream flow and return a downstream flow.
307These operators are cold, just like flows are. A call to such an operator is not
308a suspending function itself. It works quickly, returning the definition of a new transformed flow.
309
310The basic operators have familiar names like [map] and [filter].
311An important difference of these operators from sequences is that blocks of
312code inside these operators can call suspending functions.
313
314For example, a flow of incoming requests can be
315mapped to its results with a [map] operator, even when performing a request is a long-running
316operation that is implemented by a suspending function:
317
318```kotlin
319import kotlinx.coroutines.*
320import kotlinx.coroutines.flow.*
321
322//sampleStart
323suspend fun performRequest(request: Int): String {
324    delay(1000) // imitate long-running asynchronous work
325    return "response $request"
326}
327
328fun main() = runBlocking<Unit> {
329    (1..3).asFlow() // a flow of requests
330        .map { request -> performRequest(request) }
331        .collect { response -> println(response) }
332}
333//sampleEnd
334```
335{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
336
337> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt).
338>
339{type="note"}
340
341It produces the following three lines, each appearing one second after the previous:
342
343```text
344response 1
345response 2
346response 3
347```
348
349<!--- TEST -->
350
351### Transform operator
352
353Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate
354simple transformations like [map] and [filter], as well as implement more complex transformations.
355Using the `transform` operator, we can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times.
356
357For example, using `transform` we can emit a string before performing a long-running asynchronous request
358and follow it with a response:
359
360```kotlin
361import kotlinx.coroutines.*
362import kotlinx.coroutines.flow.*
363
364suspend fun performRequest(request: Int): String {
365    delay(1000) // imitate long-running asynchronous work
366    return "response $request"
367}
368
369fun main() = runBlocking<Unit> {
370//sampleStart
371    (1..3).asFlow() // a flow of requests
372        .transform { request ->
373            emit("Making request $request")
374            emit(performRequest(request))
375        }
376        .collect { response -> println(response) }
377//sampleEnd
378}
379```
380{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
381
382> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt).
383>
384{type="note"}
385
386The output of this code is:
387
388```text
389Making request 1
390response 1
391Making request 2
392response 2
393Making request 3
394response 3
395```
396
397<!--- TEST -->
398
399### Size-limiting operators
400
401Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit
402is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management
403functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation:
404
405```kotlin
406import kotlinx.coroutines.*
407import kotlinx.coroutines.flow.*
408
409//sampleStart
410fun numbers(): Flow<Int> = flow {
411    try {
412        emit(1)
413        emit(2)
414        println("This line will not execute")
415        emit(3)
416    } finally {
417        println("Finally in numbers")
418    }
419}
420
421fun main() = runBlocking<Unit> {
422    numbers()
423        .take(2) // take only the first two
424        .collect { value -> println(value) }
425}
426//sampleEnd
427```
428{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
429
430> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt).
431>
432{type="note"}
433
434The output of this code clearly shows that the execution of the `flow { ... }` body in the `numbers()` function
435stopped after emitting the second number:
436
437```text
4381
4392
440Finally in numbers
441```
442
443<!--- TEST -->
444
445## Terminal flow operators
446
447Terminal operators on flows are _suspending functions_ that start a collection of the flow.
448The [collect] operator is the most basic one, but there are other terminal operators, which can make it easier:
449
450* Conversion to various collections like [toList] and [toSet].
451* Operators to get the [first] value and to ensure that a flow emits a [single] value.
452* Reducing a flow to a value with [reduce] and [fold].
453
454For example:
455
456```kotlin
457import kotlinx.coroutines.*
458import kotlinx.coroutines.flow.*
459
460fun main() = runBlocking<Unit> {
461//sampleStart
462    val sum = (1..5).asFlow()
463        .map { it * it } // squares of numbers from 1 to 5
464        .reduce { a, b -> a + b } // sum them (terminal operator)
465    println(sum)
466//sampleEnd
467}
468```
469{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
470
471> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt).
472>
473{type="note"}
474
475Prints a single number:
476
477```text
47855
479```
480
481<!--- TEST -->
482
483## Flows are sequential
484
485Each individual collection of a flow is performed sequentially unless special operators that operate
486on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator.
487No new coroutines are launched by default.
488Each emitted value is processed by all the intermediate operators from
489upstream to downstream and is then delivered to the terminal operator after.
490
491See the following example that filters the even integers and maps them to strings:
492
493```kotlin
494import kotlinx.coroutines.*
495import kotlinx.coroutines.flow.*
496
497fun main() = runBlocking<Unit> {
498//sampleStart
499    (1..5).asFlow()
500        .filter {
501            println("Filter $it")
502            it % 2 == 0
503        }
504        .map {
505            println("Map $it")
506            "string $it"
507        }.collect {
508            println("Collect $it")
509        }
510//sampleEnd
511}
512```
513{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
514
515> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt).
516>
517{type="note"}
518
519Producing:
520
521```text
522Filter 1
523Filter 2
524Map 2
525Collect string 2
526Filter 3
527Filter 4
528Map 4
529Collect string 4
530Filter 5
531```
532
533<!--- TEST -->
534
535## Flow context
536
537Collection of a flow always happens in the context of the calling coroutine. For example, if there is
538a `simple` flow, then the following code runs in the context specified
539by the author of this code, regardless of the implementation details of the `simple` flow:
540
541```kotlin
542withContext(context) {
543    simple().collect { value ->
544        println(value) // run in the specified context
545    }
546}
547```
548
549<!--- CLEAR -->
550
551This property of a flow is called _context preservation_.
552
553So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector
554of the corresponding flow. For example, consider the implementation of a `simple` function that prints the thread
555it is called on and emits three numbers:
556
557```kotlin
558import kotlinx.coroutines.*
559import kotlinx.coroutines.flow.*
560
561fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
562
563//sampleStart
564fun simple(): Flow<Int> = flow {
565    log("Started simple flow")
566    for (i in 1..3) {
567        emit(i)
568    }
569}
570
571fun main() = runBlocking<Unit> {
572    simple().collect { value -> log("Collected $value") }
573}
574//sampleEnd
575```
576{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
577
578> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt).
579>
580{type="note"}
581
582Running this code produces:
583
584```text
585[main @coroutine#1] Started simple flow
586[main @coroutine#1] Collected 1
587[main @coroutine#1] Collected 2
588[main @coroutine#1] Collected 3
589```
590
591<!--- TEST FLEXIBLE_THREAD -->
592
593Since `simple().collect` is called from the main thread, the body of `simple`'s flow is also called in the main thread.
594This is the perfect default for fast-running or asynchronous code that does not care about the execution context and
595does not block the caller.
596
597### A common pitfall when using withContext
598
599However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating
600code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used
601to change the context in the code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor the context
602preservation property and is not allowed to [emit][FlowCollector.emit] from a different context.
603
604Try running the following code:
605
606```kotlin
607import kotlinx.coroutines.*
608import kotlinx.coroutines.flow.*
609
610//sampleStart
611fun simple(): Flow<Int> = flow {
612    // The WRONG way to change context for CPU-consuming code in flow builder
613    kotlinx.coroutines.withContext(Dispatchers.Default) {
614        for (i in 1..3) {
615            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
616            emit(i) // emit next value
617        }
618    }
619}
620
621fun main() = runBlocking<Unit> {
622    simple().collect { value -> println(value) }
623}
624//sampleEnd
625```
626{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
627
628> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt).
629>
630{type="note"}
631
632This code produces the following exception:
633
634```text
635Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
636		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
637		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
638		Please refer to 'flow' documentation or use 'flowOn' instead
639	at ...
640```
641
642<!--- TEST EXCEPTION -->
643
644### flowOn operator
645
646The exception refers to the [flowOn] function that shall be used to change the context of the flow emission.
647The correct way to change the context of a flow is shown in the example below, which also prints the
648names of the corresponding threads to show how it all works:
649
650```kotlin
651import kotlinx.coroutines.*
652import kotlinx.coroutines.flow.*
653
654fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
655
656//sampleStart
657fun simple(): Flow<Int> = flow {
658    for (i in 1..3) {
659        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
660        log("Emitting $i")
661        emit(i) // emit next value
662    }
663}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
664
665fun main() = runBlocking<Unit> {
666    simple().collect { value ->
667        log("Collected $value")
668    }
669}
670//sampleEnd
671```
672{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
673
674> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt).
675>
676{type="note"}
677
678Notice how `flow { ... }` works in the background thread, while collection happens in the main thread:
679
680<!--- TEST FLEXIBLE_THREAD
681[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
682[main @coroutine#1] Collected 1
683[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
684[main @coroutine#1] Collected 2
685[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
686[main @coroutine#1] Collected 3
687-->
688
689Another thing to observe here is that the [flowOn] operator has changed the default sequential nature of the flow.
690Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine
691("coroutine#2") that is running in another thread concurrently with the collecting coroutine. The [flowOn] operator
692creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context.
693
694## Buffering
695
696Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes
697to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when
698the emission by a `simple` flow is slow, taking 100 ms to produce an element; and collector is also slow,
699taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers:
700
701```kotlin
702import kotlinx.coroutines.*
703import kotlinx.coroutines.flow.*
704import kotlin.system.*
705
706//sampleStart
707fun simple(): Flow<Int> = flow {
708    for (i in 1..3) {
709        delay(100) // pretend we are asynchronously waiting 100 ms
710        emit(i) // emit next value
711    }
712}
713
714fun main() = runBlocking<Unit> {
715    val time = measureTimeMillis {
716        simple().collect { value ->
717            delay(300) // pretend we are processing it for 300 ms
718            println(value)
719        }
720    }
721    println("Collected in $time ms")
722}
723//sampleEnd
724```
725{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
726
727> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt).
728>
729{type="note"}
730
731It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each):
732
733```text
7341
7352
7363
737Collected in 1220 ms
738```
739
740<!--- TEST ARBITRARY_TIME -->
741
742We can use a [buffer] operator on a flow to run emitting code of the `simple` flow concurrently with collecting code,
743as opposed to running them sequentially:
744
745```kotlin
746import kotlinx.coroutines.*
747import kotlinx.coroutines.flow.*
748import kotlin.system.*
749
750fun simple(): Flow<Int> = flow {
751    for (i in 1..3) {
752        delay(100) // pretend we are asynchronously waiting 100 ms
753        emit(i) // emit next value
754    }
755}
756
757fun main() = runBlocking<Unit> {
758//sampleStart
759    val time = measureTimeMillis {
760        simple()
761            .buffer() // buffer emissions, don't wait
762            .collect { value ->
763                delay(300) // pretend we are processing it for 300 ms
764                println(value)
765            }
766    }
767    println("Collected in $time ms")
768//sampleEnd
769}
770```
771{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
772
773> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt).
774>
775{type="note"}
776
777It produces the same numbers just faster, as we have effectively created a processing pipeline,
778having to only wait 100 ms for the first number and then spending only 300 ms to process
779each number. This way it takes around 1000 ms to run:
780
781```text
7821
7832
7843
785Collected in 1071 ms
786```
787
788<!--- TEST ARBITRARY_TIME -->
789
790> Note that the [flowOn] operator uses the same buffering mechanism when it has to change a [CoroutineDispatcher],
791> but here we explicitly request buffering without changing the execution context.
792>
793{type="note"}
794
795### Conflation
796
797When a flow represents partial results of the operation or operation status updates, it may not be necessary
798to process each value, but instead, only most recent ones. In this case, the [conflate] operator can be used to skip
799intermediate values when a collector is too slow to process them. Building on the previous example:
800
801```kotlin
802import kotlinx.coroutines.*
803import kotlinx.coroutines.flow.*
804import kotlin.system.*
805
806fun simple(): Flow<Int> = flow {
807    for (i in 1..3) {
808        delay(100) // pretend we are asynchronously waiting 100 ms
809        emit(i) // emit next value
810    }
811}
812
813fun main() = runBlocking<Unit> {
814//sampleStart
815    val time = measureTimeMillis {
816        simple()
817            .conflate() // conflate emissions, don't process each one
818            .collect { value ->
819                delay(300) // pretend we are processing it for 300 ms
820                println(value)
821            }
822    }
823    println("Collected in $time ms")
824//sampleEnd
825}
826```
827{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
828
829> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt).
830>
831{type="note"}
832
833We see that while the first number was still being processed the second, and third were already produced, so
834the second one was _conflated_ and only the most recent (the third one) was delivered to the collector:
835
836```text
8371
8383
839Collected in 758 ms
840```
841
842<!--- TEST ARBITRARY_TIME -->
843
844### Processing the latest value
845
846Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values.
847The other way is to cancel a slow collector and restart it every time a new value is emitted. There is
848a family of `xxxLatest` operators that perform the same essential logic of a `xxx` operator, but cancel the
849code in their block on a new value. Let's try changing [conflate] to [collectLatest] in the previous example:
850
851```kotlin
852import kotlinx.coroutines.*
853import kotlinx.coroutines.flow.*
854import kotlin.system.*
855
856fun simple(): Flow<Int> = flow {
857    for (i in 1..3) {
858        delay(100) // pretend we are asynchronously waiting 100 ms
859        emit(i) // emit next value
860    }
861}
862
863fun main() = runBlocking<Unit> {
864//sampleStart
865    val time = measureTimeMillis {
866        simple()
867            .collectLatest { value -> // cancel & restart on the latest value
868                println("Collecting $value")
869                delay(300) // pretend we are processing it for 300 ms
870                println("Done $value")
871            }
872    }
873    println("Collected in $time ms")
874//sampleEnd
875}
876```
877{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
878
879> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt).
880>
881{type="note"}
882
883Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block
884is run on every value, but completes only for the last value:
885
886```text
887Collecting 1
888Collecting 2
889Collecting 3
890Done 3
891Collected in 741 ms
892```
893
894<!--- TEST ARBITRARY_TIME -->
895
896## Composing multiple flows
897
898There are lots of ways to compose multiple flows.
899
900### Zip
901
902Just like the [Sequence.zip] extension function in the Kotlin standard library,
903flows have a [zip] operator that combines the corresponding values of two flows:
904
905```kotlin
906import kotlinx.coroutines.*
907import kotlinx.coroutines.flow.*
908
909fun main() = runBlocking<Unit> {
910//sampleStart
911    val nums = (1..3).asFlow() // numbers 1..3
912    val strs = flowOf("one", "two", "three") // strings
913    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
914        .collect { println(it) } // collect and print
915//sampleEnd
916}
917```
918{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
919
920> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt).
921>
922{type="note"}
923
924This example prints:
925
926```text
9271 -> one
9282 -> two
9293 -> three
930```
931
932<!--- TEST -->
933
934### Combine
935
936When flow represents the most recent value of a variable or operation (see also the related
937section on [conflation](#conflation)), it might be needed to perform a computation that depends on
938the most recent values of the corresponding flows and to recompute it whenever any of the upstream
939flows emit a value. The corresponding family of operators is called [combine].
940
941For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms,
942then zipping them using the [zip] operator will still produce the same result,
943albeit results that are printed every 400 ms:
944
945> We use a [onEach] intermediate operator in this example to delay each element and make the code
946> that emits sample flows more declarative and shorter.
947>
948{type="note"}
949
950```kotlin
951import kotlinx.coroutines.*
952import kotlinx.coroutines.flow.*
953
954fun main() = runBlocking<Unit> {
955//sampleStart
956    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
957    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
958    val startTime = System.currentTimeMillis() // remember the start time
959    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
960        .collect { value -> // collect and print
961            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
962        }
963//sampleEnd
964}
965```
966{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
967
968> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt).
969>
970{type="note"}
971
972<!--- TEST ARBITRARY_TIME
9731 -> one at 437 ms from start
9742 -> two at 837 ms from start
9753 -> three at 1243 ms from start
976-->
977
978However, when using a [combine] operator here instead of a [zip]:
979
980```kotlin
981import kotlinx.coroutines.*
982import kotlinx.coroutines.flow.*
983
984fun main() = runBlocking<Unit> {
985//sampleStart
986    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
987    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
988    val startTime = System.currentTimeMillis() // remember the start time
989    nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
990        .collect { value -> // collect and print
991            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
992        }
993//sampleEnd
994}
995```
996{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
997
998> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt).
999>
1000{type="note"}
1001
1002We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows:
1003
1004```text
10051 -> one at 452 ms from start
10062 -> one at 651 ms from start
10072 -> two at 854 ms from start
10083 -> two at 952 ms from start
10093 -> three at 1256 ms from start
1010```
1011
1012<!--- TEST ARBITRARY_TIME -->
1013
1014## Flattening flows
1015
1016Flows represent asynchronously received sequences of values, and so it is quite easy to get into a situation
1017where each value triggers a request for another sequence of values. For example, we can have the following
1018function that returns a flow of two strings 500 ms apart:
1019
1020```kotlin
1021fun requestFlow(i: Int): Flow<String> = flow {
1022    emit("$i: First")
1023    delay(500) // wait 500 ms
1024    emit("$i: Second")
1025}
1026```
1027
1028<!--- CLEAR -->
1029
1030Now if we have a flow of three integers and call `requestFlow` on each of them like this:
1031
1032```kotlin
1033(1..3).asFlow().map { requestFlow(it) }
1034```
1035
1036<!--- CLEAR -->
1037
1038Then we will end up with a flow of flows (`Flow<Flow<String>>`) that needs to be _flattened_ into a single flow for
1039further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap]
1040operators for this. However, due to the asynchronous nature of flows they call for different _modes_ of flattening,
1041and hence, a family of flattening operators on flows exists.
1042
1043### flatMapConcat
1044
1045Concatenation of flows of flows is provided by the [flatMapConcat] and [flattenConcat] operators. They are the
1046most direct analogues of the corresponding sequence operators. They wait for the inner flow to complete before
1047starting to collect the next one as the following example shows:
1048
1049```kotlin
1050import kotlinx.coroutines.*
1051import kotlinx.coroutines.flow.*
1052
1053fun requestFlow(i: Int): Flow<String> = flow {
1054    emit("$i: First")
1055    delay(500) // wait 500 ms
1056    emit("$i: Second")
1057}
1058
1059fun main() = runBlocking<Unit> {
1060//sampleStart
1061    val startTime = System.currentTimeMillis() // remember the start time
1062    (1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
1063        .flatMapConcat { requestFlow(it) }
1064        .collect { value -> // collect and print
1065            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1066        }
1067//sampleEnd
1068}
1069```
1070{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1071
1072> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt).
1073>
1074{type="note"}
1075
1076The sequential nature of [flatMapConcat] is clearly seen in the output:
1077
1078```text
10791: First at 121 ms from start
10801: Second at 622 ms from start
10812: First at 727 ms from start
10822: Second at 1227 ms from start
10833: First at 1328 ms from start
10843: Second at 1829 ms from start
1085```
1086
1087<!--- TEST ARBITRARY_TIME -->
1088
1089### flatMapMerge
1090
1091Another flattening operation is to concurrently collect all the incoming flows and merge their values into
1092a single flow so that values are emitted as soon as possible.
1093It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional
1094`concurrency` parameter that limits the number of concurrent flows that are collected at the same time
1095(it is equal to [DEFAULT_CONCURRENCY] by default).
1096
1097```kotlin
1098import kotlinx.coroutines.*
1099import kotlinx.coroutines.flow.*
1100
1101fun requestFlow(i: Int): Flow<String> = flow {
1102    emit("$i: First")
1103    delay(500) // wait 500 ms
1104    emit("$i: Second")
1105}
1106
1107fun main() = runBlocking<Unit> {
1108//sampleStart
1109    val startTime = System.currentTimeMillis() // remember the start time
1110    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1111        .flatMapMerge { requestFlow(it) }
1112        .collect { value -> // collect and print
1113            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1114        }
1115//sampleEnd
1116}
1117```
1118{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1119
1120> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt).
1121>
1122{type="note"}
1123
1124The concurrent nature of [flatMapMerge] is obvious:
1125
1126```text
11271: First at 136 ms from start
11282: First at 231 ms from start
11293: First at 333 ms from start
11301: Second at 639 ms from start
11312: Second at 732 ms from start
11323: Second at 833 ms from start
1133```
1134
1135<!--- TEST ARBITRARY_TIME -->
1136
1137> Note that the [flatMapMerge] calls its block of code (`{ requestFlow(it) }` in this example) sequentially, but
1138> collects the resulting flows concurrently, it is the equivalent of performing a sequential
1139> `map { requestFlow(it) }` first and then calling [flattenMerge] on the result.
1140>
1141{type="note"}
1142
1143### flatMapLatest
1144
1145In a similar way to the [collectLatest] operator, that was described in the section
1146["Processing the latest value"](#processing-the-latest-value), there is the corresponding "Latest"
1147flattening mode where the collection of the previous flow is cancelled as soon as new flow is emitted.
1148It is implemented by the [flatMapLatest] operator.
1149
1150```kotlin
1151import kotlinx.coroutines.*
1152import kotlinx.coroutines.flow.*
1153
1154fun requestFlow(i: Int): Flow<String> = flow {
1155    emit("$i: First")
1156    delay(500) // wait 500 ms
1157    emit("$i: Second")
1158}
1159
1160fun main() = runBlocking<Unit> {
1161//sampleStart
1162    val startTime = System.currentTimeMillis() // remember the start time
1163    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1164        .flatMapLatest { requestFlow(it) }
1165        .collect { value -> // collect and print
1166            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1167        }
1168//sampleEnd
1169}
1170```
1171{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1172
1173> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt).
1174>
1175{type="note"}
1176
1177The output here in this example is a good demonstration of how [flatMapLatest] works:
1178
1179```text
11801: First at 142 ms from start
11812: First at 322 ms from start
11823: First at 425 ms from start
11833: Second at 931 ms from start
1184```
1185
1186<!--- TEST ARBITRARY_TIME -->
1187
1188> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) when a new value
1189> is received.
1190> It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending,
1191> and cannot be cancelled. However, a differnce in output would be visible if we were to use suspending functions
1192> like `delay` in `requestFlow`.
1193>
1194{type="note"}
1195
1196## Flow exceptions
1197
1198Flow collection can complete with an exception when an emitter or code inside the operators throw an exception.
1199There are several ways to handle these exceptions.
1200
1201### Collector try and catch
1202
1203A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions:
1204
1205```kotlin
1206import kotlinx.coroutines.*
1207import kotlinx.coroutines.flow.*
1208
1209//sampleStart
1210fun simple(): Flow<Int> = flow {
1211    for (i in 1..3) {
1212        println("Emitting $i")
1213        emit(i) // emit next value
1214    }
1215}
1216
1217fun main() = runBlocking<Unit> {
1218    try {
1219        simple().collect { value ->
1220            println(value)
1221            check(value <= 1) { "Collected $value" }
1222        }
1223    } catch (e: Throwable) {
1224        println("Caught $e")
1225    }
1226}
1227//sampleEnd
1228```
1229{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1230
1231> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt).
1232>
1233{type="note"}
1234
1235This code successfully catches an exception in [collect] terminal operator and,
1236as we see, no more values are emitted after that:
1237
1238```text
1239Emitting 1
12401
1241Emitting 2
12422
1243Caught java.lang.IllegalStateException: Collected 2
1244```
1245
1246<!--- TEST -->
1247
1248### Everything is caught
1249
1250The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators.
1251For example, let's change the code so that emitted values are [mapped][map] to strings,
1252but the corresponding code produces an exception:
1253
1254```kotlin
1255import kotlinx.coroutines.*
1256import kotlinx.coroutines.flow.*
1257
1258//sampleStart
1259fun simple(): Flow<String> =
1260    flow {
1261        for (i in 1..3) {
1262            println("Emitting $i")
1263            emit(i) // emit next value
1264        }
1265    }
1266    .map { value ->
1267        check(value <= 1) { "Crashed on $value" }
1268        "string $value"
1269    }
1270
1271fun main() = runBlocking<Unit> {
1272    try {
1273        simple().collect { value -> println(value) }
1274    } catch (e: Throwable) {
1275        println("Caught $e")
1276    }
1277}
1278//sampleEnd
1279```
1280{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1281
1282> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt).
1283>
1284{type="note"}
1285
1286This exception is still caught and collection is stopped:
1287
1288```text
1289Emitting 1
1290string 1
1291Emitting 2
1292Caught java.lang.IllegalStateException: Crashed on 2
1293```
1294
1295<!--- TEST -->
1296
1297## Exception transparency
1298
1299But how can code of the emitter encapsulate its exception handling behavior?
1300
1301Flows must be _transparent to exceptions_ and it is a violation of the exception transparency to [emit][FlowCollector.emit] values in the
1302`flow { ... }` builder from inside of a `try/catch` block. This guarantees that a collector throwing an exception
1303can always catch it using `try/catch` as in the previous example.
1304
1305The emitter can use a [catch] operator that preserves this exception transparency and allows encapsulation
1306of its exception handling. The body of the `catch` operator can analyze an exception
1307and react to it in different ways depending on which exception was caught:
1308
1309* Exceptions can be rethrown using `throw`.
1310* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch].
1311* Exceptions can be ignored, logged, or processed by some other code.
1312
1313For example, let us emit the text on catching an exception:
1314
1315```kotlin
1316import kotlinx.coroutines.*
1317import kotlinx.coroutines.flow.*
1318
1319fun simple(): Flow<String> =
1320    flow {
1321        for (i in 1..3) {
1322            println("Emitting $i")
1323            emit(i) // emit next value
1324        }
1325    }
1326    .map { value ->
1327        check(value <= 1) { "Crashed on $value" }
1328        "string $value"
1329    }
1330
1331fun main() = runBlocking<Unit> {
1332//sampleStart
1333    simple()
1334        .catch { e -> emit("Caught $e") } // emit on exception
1335        .collect { value -> println(value) }
1336//sampleEnd
1337}
1338```
1339{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1340
1341> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt).
1342>
1343{type="note"}
1344
1345The output of the example is the same, even though we do not have `try/catch` around the code anymore.
1346
1347<!--- TEST
1348Emitting 1
1349string 1
1350Emitting 2
1351Caught java.lang.IllegalStateException: Crashed on 2
1352-->
1353
1354### Transparent catch
1355
1356The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions
1357(that is an exception from all the operators above `catch`, but not below it).
1358If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes:
1359
1360```kotlin
1361import kotlinx.coroutines.*
1362import kotlinx.coroutines.flow.*
1363
1364//sampleStart
1365fun simple(): Flow<Int> = flow {
1366    for (i in 1..3) {
1367        println("Emitting $i")
1368        emit(i)
1369    }
1370}
1371
1372fun main() = runBlocking<Unit> {
1373    simple()
1374        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
1375        .collect { value ->
1376            check(value <= 1) { "Collected $value" }
1377            println(value)
1378        }
1379}
1380//sampleEnd
1381```
1382{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1383
1384> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt).
1385>
1386{type="note"}
1387
1388A "Caught ..." message is not printed despite there being a `catch` operator:
1389
1390```text
1391Emitting 1
13921
1393Emitting 2
1394Exception in thread "main" java.lang.IllegalStateException: Collected 2
1395	at ...
1396```
1397
1398<!--- TEST EXCEPTION -->
1399
1400### Catching declaratively
1401
1402We can combine the declarative nature of the [catch] operator with a desire to handle all the exceptions, by moving the body
1403of the [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must
1404be triggered by a call to `collect()` without parameters:
1405
1406```kotlin
1407import kotlinx.coroutines.*
1408import kotlinx.coroutines.flow.*
1409
1410fun simple(): Flow<Int> = flow {
1411    for (i in 1..3) {
1412        println("Emitting $i")
1413        emit(i)
1414    }
1415}
1416
1417fun main() = runBlocking<Unit> {
1418//sampleStart
1419    simple()
1420        .onEach { value ->
1421            check(value <= 1) { "Collected $value" }
1422            println(value)
1423        }
1424        .catch { e -> println("Caught $e") }
1425        .collect()
1426//sampleEnd
1427}
1428```
1429{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1430
1431> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt).
1432>
1433{type="note"}
1434
1435Now we can see that a "Caught ..." message is printed and so we can catch all the exceptions without explicitly
1436using a `try/catch` block:
1437
1438```text
1439Emitting 1
14401
1441Emitting 2
1442Caught java.lang.IllegalStateException: Collected 2
1443```
1444
1445<!--- TEST EXCEPTION -->
1446
1447## Flow completion
1448
1449When flow collection completes (normally or exceptionally) it may need to execute an action.
1450As you may have already noticed, it can be done in two ways: imperative or declarative.
1451
1452### Imperative finally block
1453
1454In addition to `try`/`catch`, a collector can also use a `finally` block to execute an action
1455upon `collect` completion.
1456
1457```kotlin
1458import kotlinx.coroutines.*
1459import kotlinx.coroutines.flow.*
1460
1461//sampleStart
1462fun simple(): Flow<Int> = (1..3).asFlow()
1463
1464fun main() = runBlocking<Unit> {
1465    try {
1466        simple().collect { value -> println(value) }
1467    } finally {
1468        println("Done")
1469    }
1470}
1471//sampleEnd
1472```
1473{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1474
1475> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt).
1476>
1477{type="note"}
1478
1479This code prints three numbers produced by the `simple` flow followed by a "Done" string:
1480
1481```text
14821
14832
14843
1485Done
1486```
1487
1488<!--- TEST  -->
1489
1490### Declarative handling
1491
1492For the declarative approach, flow has [onCompletion] intermediate operator that is invoked
1493when the flow has completely collected.
1494
1495The previous example can be rewritten using an [onCompletion] operator and produces the same output:
1496
1497```kotlin
1498import kotlinx.coroutines.*
1499import kotlinx.coroutines.flow.*
1500
1501fun simple(): Flow<Int> = (1..3).asFlow()
1502
1503fun main() = runBlocking<Unit> {
1504//sampleStart
1505    simple()
1506        .onCompletion { println("Done") }
1507        .collect { value -> println(value) }
1508//sampleEnd
1509}
1510```
1511{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1512
1513> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt).
1514>
1515{type="note"}
1516
1517<!--- TEST
15181
15192
15203
1521Done
1522-->
1523
1524The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used
1525to determine whether the flow collection was completed normally or exceptionally. In the following
1526example the `simple` flow throws an exception after emitting the number 1:
1527
1528```kotlin
1529import kotlinx.coroutines.*
1530import kotlinx.coroutines.flow.*
1531
1532//sampleStart
1533fun simple(): Flow<Int> = flow {
1534    emit(1)
1535    throw RuntimeException()
1536}
1537
1538fun main() = runBlocking<Unit> {
1539    simple()
1540        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
1541        .catch { cause -> println("Caught exception") }
1542        .collect { value -> println(value) }
1543}
1544//sampleEnd
1545```
1546{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1547
1548> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt).
1549>
1550{type="note"}
1551
1552As you may expect, it prints:
1553
1554```text
15551
1556Flow completed exceptionally
1557Caught exception
1558```
1559
1560<!--- TEST -->
1561
1562The [onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above
1563example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators
1564and can be handled with a `catch` operator.
1565
1566### Successful completion
1567
1568Another difference with [catch] operator is that [onCompletion] sees all exceptions and receives
1569a `null` exception only on successful completion of the upstream flow (without cancellation or failure).
1570
1571```kotlin
1572import kotlinx.coroutines.*
1573import kotlinx.coroutines.flow.*
1574
1575//sampleStart
1576fun simple(): Flow<Int> = (1..3).asFlow()
1577
1578fun main() = runBlocking<Unit> {
1579    simple()
1580        .onCompletion { cause -> println("Flow completed with $cause") }
1581        .collect { value ->
1582            check(value <= 1) { "Collected $value" }
1583            println(value)
1584        }
1585}
1586//sampleEnd
1587```
1588{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1589
1590> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt).
1591>
1592{type="note"}
1593
1594We can see the completion cause is not null, because the flow was aborted due to downstream exception:
1595
1596```text
15971
1598Flow completed with java.lang.IllegalStateException: Collected 2
1599Exception in thread "main" java.lang.IllegalStateException: Collected 2
1600```
1601
1602<!--- TEST EXCEPTION -->
1603
1604## Imperative versus declarative
1605
1606Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways.
1607The natural question here is, which approach is preferred and why?
1608As a library, we do not advocate for any particular approach and believe that both options
1609are valid and should be selected according to your own preferences and code style.
1610
1611## Launching flow
1612
1613It is easy to use flows to represent asynchronous events that are coming from some source.
1614In this case, we need an analogue of the `addEventListener` function that registers a piece of code with a reaction
1615for incoming events and continues further work. The [onEach] operator can serve this role.
1616However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow.
1617Otherwise, just calling `onEach` has no effect.
1618
1619If we use the [collect] terminal operator after `onEach`, then the code after it will wait until the flow is collected:
1620
1621```kotlin
1622import kotlinx.coroutines.*
1623import kotlinx.coroutines.flow.*
1624
1625//sampleStart
1626// Imitate a flow of events
1627fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1628
1629fun main() = runBlocking<Unit> {
1630    events()
1631        .onEach { event -> println("Event: $event") }
1632        .collect() // <--- Collecting the flow waits
1633    println("Done")
1634}
1635//sampleEnd
1636```
1637{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1638
1639> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt).
1640>
1641{type="note"}
1642
1643As you can see, it prints:
1644
1645```text
1646Event: 1
1647Event: 2
1648Event: 3
1649Done
1650```
1651
1652<!--- TEST -->
1653
1654The [launchIn] terminal operator comes in handy here. By replacing `collect` with `launchIn` we can
1655launch a collection of the flow in a separate coroutine, so that execution of further code
1656immediately continues:
1657
1658```kotlin
1659import kotlinx.coroutines.*
1660import kotlinx.coroutines.flow.*
1661
1662// Imitate a flow of events
1663fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1664
1665//sampleStart
1666fun main() = runBlocking<Unit> {
1667    events()
1668        .onEach { event -> println("Event: $event") }
1669        .launchIn(this) // <--- Launching the flow in a separate coroutine
1670    println("Done")
1671}
1672//sampleEnd
1673```
1674{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1675
1676> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt).
1677>
1678{type="note"}
1679
1680It prints:
1681
1682```text
1683Done
1684Event: 1
1685Event: 2
1686Event: 3
1687```
1688
1689<!--- TEST -->
1690
1691The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is
1692launched. In the above example this scope comes from the [runBlocking]
1693coroutine builder, so while the flow is running, this [runBlocking] scope waits for completion of its child coroutine
1694and keeps the main function from returning and terminating this example.
1695
1696In actual applications a scope will come from an entity with a limited
1697lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling
1698the collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works
1699like the `addEventListener`. However, there is no need for the corresponding `removeEventListener` function,
1700as cancellation and structured concurrency serve this purpose.
1701
1702Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
1703coroutine only without cancelling the whole scope or to [join][Job.join] it.
1704
1705### Flow cancellation checks
1706
1707For convenience, the [flow][_flow] builder performs additional [ensureActive] checks for cancellation on each emitted value.
1708It means that a busy loop emitting from a `flow { ... }` is cancellable:
1709
1710```kotlin
1711import kotlinx.coroutines.*
1712import kotlinx.coroutines.flow.*
1713
1714//sampleStart
1715fun foo(): Flow<Int> = flow {
1716    for (i in 1..5) {
1717        println("Emitting $i")
1718        emit(i)
1719    }
1720}
1721
1722fun main() = runBlocking<Unit> {
1723    foo().collect { value ->
1724        if (value == 3) cancel()
1725        println(value)
1726    }
1727}
1728//sampleEnd
1729```
1730{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1731
1732> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt).
1733>
1734{type="note"}
1735
1736We get only numbers up to 3 and a [CancellationException] after trying to emit number 4:
1737
1738```text
1739Emitting 1
17401
1741Emitting 2
17422
1743Emitting 3
17443
1745Emitting 4
1746Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
1747```
1748
1749<!--- TEST EXCEPTION -->
1750
1751However, most other flow operators do not do additional cancellation checks on their own for performance reasons.
1752For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere,
1753then there are no checks for cancellation:
1754
1755```kotlin
1756import kotlinx.coroutines.*
1757import kotlinx.coroutines.flow.*
1758
1759//sampleStart
1760fun main() = runBlocking<Unit> {
1761    (1..5).asFlow().collect { value ->
1762        if (value == 3) cancel()
1763        println(value)
1764    }
1765}
1766//sampleEnd
1767```
1768{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1769
1770> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt).
1771>
1772{type="note"}
1773
1774All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`:
1775
1776```text
17771
17782
17793
17804
17815
1782Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
1783```
1784
1785<!--- TEST EXCEPTION -->
1786
1787#### Making busy flow cancellable
1788
1789In the case where you have a busy loop with coroutines you must explicitly check for cancellation.
1790You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use
1791[cancellable] operator provided to do that:
1792
1793```kotlin
1794import kotlinx.coroutines.*
1795import kotlinx.coroutines.flow.*
1796
1797//sampleStart
1798fun main() = runBlocking<Unit> {
1799    (1..5).asFlow().cancellable().collect { value ->
1800        if (value == 3) cancel()
1801        println(value)
1802    }
1803}
1804//sampleEnd
1805```
1806{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
1807
1808> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt).
1809>
1810{type="note"}
1811
1812With the `cancellable` operator only the numbers from 1 to 3 are collected:
1813
1814```text
18151
18162
18173
1818Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
1819```
1820
1821<!--- TEST EXCEPTION -->
1822
1823## Flow and Reactive Streams
1824
1825For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor,
1826design of the Flow may look very familiar.
1827
1828Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible,
1829be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article.
1830
1831While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa.
1832Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2`/`kotlinx-coroutines-rx3` for RxJava2/RxJava3).
1833Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities.
1834
1835<!-- stdlib references -->
1836
1837[collections]: https://kotlinlang.org/docs/reference/collections-overview.html
1838[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/
1839[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html
1840[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/
1841[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html
1842[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html
1843[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html
1844[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html
1845
1846<!--- MODULE kotlinx-coroutines-core -->
1847<!--- INDEX kotlinx.coroutines -->
1848
1849[delay]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
1850[withTimeoutOrNull]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
1851[Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
1852[Dispatchers.Main]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html
1853[withContext]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
1854[CoroutineDispatcher]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
1855[CoroutineScope]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
1856[runBlocking]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
1857[Job]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
1858[Job.cancel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/cancel.html
1859[Job.join]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
1860[ensureActive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html
1861[CancellationException]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html
1862
1863<!--- INDEX kotlinx.coroutines.flow -->
1864
1865[Flow]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
1866[_flow]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
1867[FlowCollector.emit]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html
1868[collect]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html
1869[flowOf]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
1870[map]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html
1871[filter]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html
1872[transform]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
1873[take]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
1874[toList]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
1875[toSet]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html
1876[first]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html
1877[single]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html
1878[reduce]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html
1879[fold]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html
1880[flowOn]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
1881[buffer]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html
1882[conflate]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html
1883[collectLatest]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html
1884[zip]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html
1885[combine]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html
1886[onEach]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html
1887[flatMapConcat]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html
1888[flattenConcat]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html
1889[flatMapMerge]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html
1890[flattenMerge]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html
1891[DEFAULT_CONCURRENCY]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html
1892[flatMapLatest]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html
1893[catch]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
1894[onCompletion]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
1895[launchIn]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
1896[IntRange.asFlow]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/as-flow.html
1897[cancellable]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html
1898
1899<!--- END -->
1900