1<!--- TEST_NAME FlowGuideTest --> 2 3[//]: # (title: Asynchronous Flow) 4 5A suspending function asynchronously returns a single value, but how can we return 6multiple asynchronously computed values? This is where Kotlin Flows come in. 7 8## Representing multiple values 9 10Multiple values can be represented in Kotlin using [collections]. 11For example, we can have a `simple` function that returns a [List] 12of three numbers and then print them all using [forEach]: 13 14```kotlin 15fun simple(): List<Int> = listOf(1, 2, 3) 16 17fun main() { 18 simple().forEach { value -> println(value) } 19} 20``` 21{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 22 23> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt). 24> 25{type="note"} 26 27This code outputs: 28 29```text 301 312 323 33``` 34 35<!--- TEST --> 36 37### Sequences 38 39If we are computing the numbers with some CPU-consuming blocking code 40(each computation taking 100ms), then we can represent the numbers using a [Sequence]: 41 42```kotlin 43fun simple(): Sequence<Int> = sequence { // sequence builder 44 for (i in 1..3) { 45 Thread.sleep(100) // pretend we are computing it 46 yield(i) // yield next value 47 } 48} 49 50fun main() { 51 simple().forEach { value -> println(value) } 52} 53``` 54{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 55 56> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt). 57> 58{type="note"} 59 60This code outputs the same numbers, but it waits 100ms before printing each one. 61 62<!--- TEST 631 642 653 66--> 67 68### Suspending functions 69 70However, this computation blocks the main thread that is running the code. 71When these values are computed by asynchronous code we can mark the `simple` function with a `suspend` modifier, 72so that it can perform its work without blocking and return the result as a list: 73 74```kotlin 75import kotlinx.coroutines.* 76 77//sampleStart 78suspend fun simple(): List<Int> { 79 delay(1000) // pretend we are doing something asynchronous here 80 return listOf(1, 2, 3) 81} 82 83fun main() = runBlocking<Unit> { 84 simple().forEach { value -> println(value) } 85} 86//sampleEnd 87``` 88{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 89 90> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt). 91> 92{type="note"} 93 94This code prints the numbers after waiting for a second. 95 96<!--- TEST 971 982 993 100--> 101 102### Flows 103 104Using the `List<Int>` result type, means we can only return all the values at once. To represent 105the stream of values that are being computed asynchronously, we can use a [`Flow<Int>`][Flow] type just like we would use a `Sequence<Int>` type for synchronously computed values: 106 107```kotlin 108import kotlinx.coroutines.* 109import kotlinx.coroutines.flow.* 110 111//sampleStart 112fun simple(): Flow<Int> = flow { // flow builder 113 for (i in 1..3) { 114 delay(100) // pretend we are doing something useful here 115 emit(i) // emit next value 116 } 117} 118 119fun main() = runBlocking<Unit> { 120 // Launch a concurrent coroutine to check if the main thread is blocked 121 launch { 122 for (k in 1..3) { 123 println("I'm not blocked $k") 124 delay(100) 125 } 126 } 127 // Collect the flow 128 simple().collect { value -> println(value) } 129} 130//sampleEnd 131``` 132{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 133 134> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt). 135> 136{type="note"} 137 138This code waits 100ms before printing each number without blocking the main thread. This is verified 139by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread: 140 141```text 142I'm not blocked 1 1431 144I'm not blocked 2 1452 146I'm not blocked 3 1473 148``` 149 150<!--- TEST --> 151 152Notice the following differences in the code with the [Flow] from the earlier examples: 153 154* A builder function of [Flow] type is called [flow][_flow]. 155* Code inside a `flow { ... }` builder block can suspend. 156* The `simple` function is no longer marked with a `suspend` modifier. 157* Values are _emitted_ from the flow using an [emit][FlowCollector.emit] function. 158* Values are _collected_ from the flow using a [collect][collect] function. 159 160> We can replace [delay] with `Thread.sleep` in the body of `simple`'s `flow { ... }` and see that the main 161> thread is blocked in this case. 162> 163{type="note"} 164 165## Flows are cold 166 167Flows are _cold_ streams similar to sequences — the code inside a [flow][_flow] builder does not 168run until the flow is collected. This becomes clear in the following example: 169 170```kotlin 171import kotlinx.coroutines.* 172import kotlinx.coroutines.flow.* 173 174//sampleStart 175fun simple(): Flow<Int> = flow { 176 println("Flow started") 177 for (i in 1..3) { 178 delay(100) 179 emit(i) 180 } 181} 182 183fun main() = runBlocking<Unit> { 184 println("Calling simple function...") 185 val flow = simple() 186 println("Calling collect...") 187 flow.collect { value -> println(value) } 188 println("Calling collect again...") 189 flow.collect { value -> println(value) } 190} 191//sampleEnd 192``` 193{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 194 195> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt). 196> 197{type="note"} 198 199Which prints: 200 201```text 202Calling simple function... 203Calling collect... 204Flow started 2051 2062 2073 208Calling collect again... 209Flow started 2101 2112 2123 213``` 214 215<!--- TEST --> 216 217This is a key reason the `simple` function (which returns a flow) is not marked with `suspend` modifier. 218The `simple()` call itself returns quickly and does not wait for anything. The flow starts afresh every time it is 219collected and that is why we see "Flow started" every time we call `collect` again. 220 221## Flow cancellation basics 222 223Flows adhere to the general cooperative cancellation of coroutines. As usual, flow collection can be 224cancelled when the flow is suspended in a cancellable suspending function (like [delay]). 225The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block 226and stops executing its code: 227 228```kotlin 229import kotlinx.coroutines.* 230import kotlinx.coroutines.flow.* 231 232//sampleStart 233fun simple(): Flow<Int> = flow { 234 for (i in 1..3) { 235 delay(100) 236 println("Emitting $i") 237 emit(i) 238 } 239} 240 241fun main() = runBlocking<Unit> { 242 withTimeoutOrNull(250) { // Timeout after 250ms 243 simple().collect { value -> println(value) } 244 } 245 println("Done") 246} 247//sampleEnd 248``` 249{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 250 251> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt). 252> 253{type="note"} 254 255Notice how only two numbers get emitted by the flow in the `simple` function, producing the following output: 256 257```text 258Emitting 1 2591 260Emitting 2 2612 262Done 263``` 264 265<!--- TEST --> 266 267See [Flow cancellation checks](#flow-cancellation-checks) section for more details. 268 269## Flow builders 270 271The `flow { ... }` builder from the previous examples is the most basic one. There are other builders 272that allow flows to be declared: 273 274* The [flowOf] builder defines a flow that emits a fixed set of values. 275* Various collections and sequences can be converted to flows using the `.asFlow()` extension function. 276 277For example, the snippet that prints the numbers 1 to 3 from a flow can be rewritten as follows: 278 279```kotlin 280import kotlinx.coroutines.* 281import kotlinx.coroutines.flow.* 282 283fun main() = runBlocking<Unit> { 284//sampleStart 285 // Convert an integer range to a flow 286 (1..3).asFlow().collect { value -> println(value) } 287//sampleEnd 288} 289``` 290{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 291 292> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt). 293> 294{type="note"} 295 296<!--- TEST 2971 2982 2993 300--> 301 302## Intermediate flow operators 303 304Flows can be transformed using operators, in the same way as you would transform collections and 305sequences. 306Intermediate operators are applied to an upstream flow and return a downstream flow. 307These operators are cold, just like flows are. A call to such an operator is not 308a suspending function itself. It works quickly, returning the definition of a new transformed flow. 309 310The basic operators have familiar names like [map] and [filter]. 311An important difference of these operators from sequences is that blocks of 312code inside these operators can call suspending functions. 313 314For example, a flow of incoming requests can be 315mapped to its results with a [map] operator, even when performing a request is a long-running 316operation that is implemented by a suspending function: 317 318```kotlin 319import kotlinx.coroutines.* 320import kotlinx.coroutines.flow.* 321 322//sampleStart 323suspend fun performRequest(request: Int): String { 324 delay(1000) // imitate long-running asynchronous work 325 return "response $request" 326} 327 328fun main() = runBlocking<Unit> { 329 (1..3).asFlow() // a flow of requests 330 .map { request -> performRequest(request) } 331 .collect { response -> println(response) } 332} 333//sampleEnd 334``` 335{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 336 337> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt). 338> 339{type="note"} 340 341It produces the following three lines, each appearing one second after the previous: 342 343```text 344response 1 345response 2 346response 3 347``` 348 349<!--- TEST --> 350 351### Transform operator 352 353Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate 354simple transformations like [map] and [filter], as well as implement more complex transformations. 355Using the `transform` operator, we can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times. 356 357For example, using `transform` we can emit a string before performing a long-running asynchronous request 358and follow it with a response: 359 360```kotlin 361import kotlinx.coroutines.* 362import kotlinx.coroutines.flow.* 363 364suspend fun performRequest(request: Int): String { 365 delay(1000) // imitate long-running asynchronous work 366 return "response $request" 367} 368 369fun main() = runBlocking<Unit> { 370//sampleStart 371 (1..3).asFlow() // a flow of requests 372 .transform { request -> 373 emit("Making request $request") 374 emit(performRequest(request)) 375 } 376 .collect { response -> println(response) } 377//sampleEnd 378} 379``` 380{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 381 382> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt). 383> 384{type="note"} 385 386The output of this code is: 387 388```text 389Making request 1 390response 1 391Making request 2 392response 2 393Making request 3 394response 3 395``` 396 397<!--- TEST --> 398 399### Size-limiting operators 400 401Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit 402is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management 403functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation: 404 405```kotlin 406import kotlinx.coroutines.* 407import kotlinx.coroutines.flow.* 408 409//sampleStart 410fun numbers(): Flow<Int> = flow { 411 try { 412 emit(1) 413 emit(2) 414 println("This line will not execute") 415 emit(3) 416 } finally { 417 println("Finally in numbers") 418 } 419} 420 421fun main() = runBlocking<Unit> { 422 numbers() 423 .take(2) // take only the first two 424 .collect { value -> println(value) } 425} 426//sampleEnd 427``` 428{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 429 430> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt). 431> 432{type="note"} 433 434The output of this code clearly shows that the execution of the `flow { ... }` body in the `numbers()` function 435stopped after emitting the second number: 436 437```text 4381 4392 440Finally in numbers 441``` 442 443<!--- TEST --> 444 445## Terminal flow operators 446 447Terminal operators on flows are _suspending functions_ that start a collection of the flow. 448The [collect] operator is the most basic one, but there are other terminal operators, which can make it easier: 449 450* Conversion to various collections like [toList] and [toSet]. 451* Operators to get the [first] value and to ensure that a flow emits a [single] value. 452* Reducing a flow to a value with [reduce] and [fold]. 453 454For example: 455 456```kotlin 457import kotlinx.coroutines.* 458import kotlinx.coroutines.flow.* 459 460fun main() = runBlocking<Unit> { 461//sampleStart 462 val sum = (1..5).asFlow() 463 .map { it * it } // squares of numbers from 1 to 5 464 .reduce { a, b -> a + b } // sum them (terminal operator) 465 println(sum) 466//sampleEnd 467} 468``` 469{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 470 471> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt). 472> 473{type="note"} 474 475Prints a single number: 476 477```text 47855 479``` 480 481<!--- TEST --> 482 483## Flows are sequential 484 485Each individual collection of a flow is performed sequentially unless special operators that operate 486on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator. 487No new coroutines are launched by default. 488Each emitted value is processed by all the intermediate operators from 489upstream to downstream and is then delivered to the terminal operator after. 490 491See the following example that filters the even integers and maps them to strings: 492 493```kotlin 494import kotlinx.coroutines.* 495import kotlinx.coroutines.flow.* 496 497fun main() = runBlocking<Unit> { 498//sampleStart 499 (1..5).asFlow() 500 .filter { 501 println("Filter $it") 502 it % 2 == 0 503 } 504 .map { 505 println("Map $it") 506 "string $it" 507 }.collect { 508 println("Collect $it") 509 } 510//sampleEnd 511} 512``` 513{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 514 515> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt). 516> 517{type="note"} 518 519Producing: 520 521```text 522Filter 1 523Filter 2 524Map 2 525Collect string 2 526Filter 3 527Filter 4 528Map 4 529Collect string 4 530Filter 5 531``` 532 533<!--- TEST --> 534 535## Flow context 536 537Collection of a flow always happens in the context of the calling coroutine. For example, if there is 538a `simple` flow, then the following code runs in the context specified 539by the author of this code, regardless of the implementation details of the `simple` flow: 540 541```kotlin 542withContext(context) { 543 simple().collect { value -> 544 println(value) // run in the specified context 545 } 546} 547``` 548 549<!--- CLEAR --> 550 551This property of a flow is called _context preservation_. 552 553So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector 554of the corresponding flow. For example, consider the implementation of a `simple` function that prints the thread 555it is called on and emits three numbers: 556 557```kotlin 558import kotlinx.coroutines.* 559import kotlinx.coroutines.flow.* 560 561fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") 562 563//sampleStart 564fun simple(): Flow<Int> = flow { 565 log("Started simple flow") 566 for (i in 1..3) { 567 emit(i) 568 } 569} 570 571fun main() = runBlocking<Unit> { 572 simple().collect { value -> log("Collected $value") } 573} 574//sampleEnd 575``` 576{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 577 578> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt). 579> 580{type="note"} 581 582Running this code produces: 583 584```text 585[main @coroutine#1] Started simple flow 586[main @coroutine#1] Collected 1 587[main @coroutine#1] Collected 2 588[main @coroutine#1] Collected 3 589``` 590 591<!--- TEST FLEXIBLE_THREAD --> 592 593Since `simple().collect` is called from the main thread, the body of `simple`'s flow is also called in the main thread. 594This is the perfect default for fast-running or asynchronous code that does not care about the execution context and 595does not block the caller. 596 597### A common pitfall when using withContext 598 599However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating 600code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used 601to change the context in the code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor the context 602preservation property and is not allowed to [emit][FlowCollector.emit] from a different context. 603 604Try running the following code: 605 606```kotlin 607import kotlinx.coroutines.* 608import kotlinx.coroutines.flow.* 609 610//sampleStart 611fun simple(): Flow<Int> = flow { 612 // The WRONG way to change context for CPU-consuming code in flow builder 613 kotlinx.coroutines.withContext(Dispatchers.Default) { 614 for (i in 1..3) { 615 Thread.sleep(100) // pretend we are computing it in CPU-consuming way 616 emit(i) // emit next value 617 } 618 } 619} 620 621fun main() = runBlocking<Unit> { 622 simple().collect { value -> println(value) } 623} 624//sampleEnd 625``` 626{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 627 628> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt). 629> 630{type="note"} 631 632This code produces the following exception: 633 634```text 635Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: 636 Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323], 637 but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default]. 638 Please refer to 'flow' documentation or use 'flowOn' instead 639 at ... 640``` 641 642<!--- TEST EXCEPTION --> 643 644### flowOn operator 645 646The exception refers to the [flowOn] function that shall be used to change the context of the flow emission. 647The correct way to change the context of a flow is shown in the example below, which also prints the 648names of the corresponding threads to show how it all works: 649 650```kotlin 651import kotlinx.coroutines.* 652import kotlinx.coroutines.flow.* 653 654fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") 655 656//sampleStart 657fun simple(): Flow<Int> = flow { 658 for (i in 1..3) { 659 Thread.sleep(100) // pretend we are computing it in CPU-consuming way 660 log("Emitting $i") 661 emit(i) // emit next value 662 } 663}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder 664 665fun main() = runBlocking<Unit> { 666 simple().collect { value -> 667 log("Collected $value") 668 } 669} 670//sampleEnd 671``` 672{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 673 674> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt). 675> 676{type="note"} 677 678Notice how `flow { ... }` works in the background thread, while collection happens in the main thread: 679 680<!--- TEST FLEXIBLE_THREAD 681[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1 682[main @coroutine#1] Collected 1 683[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2 684[main @coroutine#1] Collected 2 685[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3 686[main @coroutine#1] Collected 3 687--> 688 689Another thing to observe here is that the [flowOn] operator has changed the default sequential nature of the flow. 690Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine 691("coroutine#2") that is running in another thread concurrently with the collecting coroutine. The [flowOn] operator 692creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context. 693 694## Buffering 695 696Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes 697to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when 698the emission by a `simple` flow is slow, taking 100 ms to produce an element; and collector is also slow, 699taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers: 700 701```kotlin 702import kotlinx.coroutines.* 703import kotlinx.coroutines.flow.* 704import kotlin.system.* 705 706//sampleStart 707fun simple(): Flow<Int> = flow { 708 for (i in 1..3) { 709 delay(100) // pretend we are asynchronously waiting 100 ms 710 emit(i) // emit next value 711 } 712} 713 714fun main() = runBlocking<Unit> { 715 val time = measureTimeMillis { 716 simple().collect { value -> 717 delay(300) // pretend we are processing it for 300 ms 718 println(value) 719 } 720 } 721 println("Collected in $time ms") 722} 723//sampleEnd 724``` 725{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 726 727> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt). 728> 729{type="note"} 730 731It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each): 732 733```text 7341 7352 7363 737Collected in 1220 ms 738``` 739 740<!--- TEST ARBITRARY_TIME --> 741 742We can use a [buffer] operator on a flow to run emitting code of the `simple` flow concurrently with collecting code, 743as opposed to running them sequentially: 744 745```kotlin 746import kotlinx.coroutines.* 747import kotlinx.coroutines.flow.* 748import kotlin.system.* 749 750fun simple(): Flow<Int> = flow { 751 for (i in 1..3) { 752 delay(100) // pretend we are asynchronously waiting 100 ms 753 emit(i) // emit next value 754 } 755} 756 757fun main() = runBlocking<Unit> { 758//sampleStart 759 val time = measureTimeMillis { 760 simple() 761 .buffer() // buffer emissions, don't wait 762 .collect { value -> 763 delay(300) // pretend we are processing it for 300 ms 764 println(value) 765 } 766 } 767 println("Collected in $time ms") 768//sampleEnd 769} 770``` 771{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 772 773> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt). 774> 775{type="note"} 776 777It produces the same numbers just faster, as we have effectively created a processing pipeline, 778having to only wait 100 ms for the first number and then spending only 300 ms to process 779each number. This way it takes around 1000 ms to run: 780 781```text 7821 7832 7843 785Collected in 1071 ms 786``` 787 788<!--- TEST ARBITRARY_TIME --> 789 790> Note that the [flowOn] operator uses the same buffering mechanism when it has to change a [CoroutineDispatcher], 791> but here we explicitly request buffering without changing the execution context. 792> 793{type="note"} 794 795### Conflation 796 797When a flow represents partial results of the operation or operation status updates, it may not be necessary 798to process each value, but instead, only most recent ones. In this case, the [conflate] operator can be used to skip 799intermediate values when a collector is too slow to process them. Building on the previous example: 800 801```kotlin 802import kotlinx.coroutines.* 803import kotlinx.coroutines.flow.* 804import kotlin.system.* 805 806fun simple(): Flow<Int> = flow { 807 for (i in 1..3) { 808 delay(100) // pretend we are asynchronously waiting 100 ms 809 emit(i) // emit next value 810 } 811} 812 813fun main() = runBlocking<Unit> { 814//sampleStart 815 val time = measureTimeMillis { 816 simple() 817 .conflate() // conflate emissions, don't process each one 818 .collect { value -> 819 delay(300) // pretend we are processing it for 300 ms 820 println(value) 821 } 822 } 823 println("Collected in $time ms") 824//sampleEnd 825} 826``` 827{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 828 829> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt). 830> 831{type="note"} 832 833We see that while the first number was still being processed the second, and third were already produced, so 834the second one was _conflated_ and only the most recent (the third one) was delivered to the collector: 835 836```text 8371 8383 839Collected in 758 ms 840``` 841 842<!--- TEST ARBITRARY_TIME --> 843 844### Processing the latest value 845 846Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values. 847The other way is to cancel a slow collector and restart it every time a new value is emitted. There is 848a family of `xxxLatest` operators that perform the same essential logic of a `xxx` operator, but cancel the 849code in their block on a new value. Let's try changing [conflate] to [collectLatest] in the previous example: 850 851```kotlin 852import kotlinx.coroutines.* 853import kotlinx.coroutines.flow.* 854import kotlin.system.* 855 856fun simple(): Flow<Int> = flow { 857 for (i in 1..3) { 858 delay(100) // pretend we are asynchronously waiting 100 ms 859 emit(i) // emit next value 860 } 861} 862 863fun main() = runBlocking<Unit> { 864//sampleStart 865 val time = measureTimeMillis { 866 simple() 867 .collectLatest { value -> // cancel & restart on the latest value 868 println("Collecting $value") 869 delay(300) // pretend we are processing it for 300 ms 870 println("Done $value") 871 } 872 } 873 println("Collected in $time ms") 874//sampleEnd 875} 876``` 877{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 878 879> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt). 880> 881{type="note"} 882 883Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block 884is run on every value, but completes only for the last value: 885 886```text 887Collecting 1 888Collecting 2 889Collecting 3 890Done 3 891Collected in 741 ms 892``` 893 894<!--- TEST ARBITRARY_TIME --> 895 896## Composing multiple flows 897 898There are lots of ways to compose multiple flows. 899 900### Zip 901 902Just like the [Sequence.zip] extension function in the Kotlin standard library, 903flows have a [zip] operator that combines the corresponding values of two flows: 904 905```kotlin 906import kotlinx.coroutines.* 907import kotlinx.coroutines.flow.* 908 909fun main() = runBlocking<Unit> { 910//sampleStart 911 val nums = (1..3).asFlow() // numbers 1..3 912 val strs = flowOf("one", "two", "three") // strings 913 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string 914 .collect { println(it) } // collect and print 915//sampleEnd 916} 917``` 918{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 919 920> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt). 921> 922{type="note"} 923 924This example prints: 925 926```text 9271 -> one 9282 -> two 9293 -> three 930``` 931 932<!--- TEST --> 933 934### Combine 935 936When flow represents the most recent value of a variable or operation (see also the related 937section on [conflation](#conflation)), it might be needed to perform a computation that depends on 938the most recent values of the corresponding flows and to recompute it whenever any of the upstream 939flows emit a value. The corresponding family of operators is called [combine]. 940 941For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms, 942then zipping them using the [zip] operator will still produce the same result, 943albeit results that are printed every 400 ms: 944 945> We use a [onEach] intermediate operator in this example to delay each element and make the code 946> that emits sample flows more declarative and shorter. 947> 948{type="note"} 949 950```kotlin 951import kotlinx.coroutines.* 952import kotlinx.coroutines.flow.* 953 954fun main() = runBlocking<Unit> { 955//sampleStart 956 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms 957 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms 958 val startTime = System.currentTimeMillis() // remember the start time 959 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip" 960 .collect { value -> // collect and print 961 println("$value at ${System.currentTimeMillis() - startTime} ms from start") 962 } 963//sampleEnd 964} 965``` 966{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 967 968> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt). 969> 970{type="note"} 971 972<!--- TEST ARBITRARY_TIME 9731 -> one at 437 ms from start 9742 -> two at 837 ms from start 9753 -> three at 1243 ms from start 976--> 977 978However, when using a [combine] operator here instead of a [zip]: 979 980```kotlin 981import kotlinx.coroutines.* 982import kotlinx.coroutines.flow.* 983 984fun main() = runBlocking<Unit> { 985//sampleStart 986 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms 987 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms 988 val startTime = System.currentTimeMillis() // remember the start time 989 nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine" 990 .collect { value -> // collect and print 991 println("$value at ${System.currentTimeMillis() - startTime} ms from start") 992 } 993//sampleEnd 994} 995``` 996{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 997 998> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt). 999> 1000{type="note"} 1001 1002We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows: 1003 1004```text 10051 -> one at 452 ms from start 10062 -> one at 651 ms from start 10072 -> two at 854 ms from start 10083 -> two at 952 ms from start 10093 -> three at 1256 ms from start 1010``` 1011 1012<!--- TEST ARBITRARY_TIME --> 1013 1014## Flattening flows 1015 1016Flows represent asynchronously received sequences of values, and so it is quite easy to get into a situation 1017where each value triggers a request for another sequence of values. For example, we can have the following 1018function that returns a flow of two strings 500 ms apart: 1019 1020```kotlin 1021fun requestFlow(i: Int): Flow<String> = flow { 1022 emit("$i: First") 1023 delay(500) // wait 500 ms 1024 emit("$i: Second") 1025} 1026``` 1027 1028<!--- CLEAR --> 1029 1030Now if we have a flow of three integers and call `requestFlow` on each of them like this: 1031 1032```kotlin 1033(1..3).asFlow().map { requestFlow(it) } 1034``` 1035 1036<!--- CLEAR --> 1037 1038Then we will end up with a flow of flows (`Flow<Flow<String>>`) that needs to be _flattened_ into a single flow for 1039further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap] 1040operators for this. However, due to the asynchronous nature of flows they call for different _modes_ of flattening, 1041and hence, a family of flattening operators on flows exists. 1042 1043### flatMapConcat 1044 1045Concatenation of flows of flows is provided by the [flatMapConcat] and [flattenConcat] operators. They are the 1046most direct analogues of the corresponding sequence operators. They wait for the inner flow to complete before 1047starting to collect the next one as the following example shows: 1048 1049```kotlin 1050import kotlinx.coroutines.* 1051import kotlinx.coroutines.flow.* 1052 1053fun requestFlow(i: Int): Flow<String> = flow { 1054 emit("$i: First") 1055 delay(500) // wait 500 ms 1056 emit("$i: Second") 1057} 1058 1059fun main() = runBlocking<Unit> { 1060//sampleStart 1061 val startTime = System.currentTimeMillis() // remember the start time 1062 (1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms 1063 .flatMapConcat { requestFlow(it) } 1064 .collect { value -> // collect and print 1065 println("$value at ${System.currentTimeMillis() - startTime} ms from start") 1066 } 1067//sampleEnd 1068} 1069``` 1070{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1071 1072> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt). 1073> 1074{type="note"} 1075 1076The sequential nature of [flatMapConcat] is clearly seen in the output: 1077 1078```text 10791: First at 121 ms from start 10801: Second at 622 ms from start 10812: First at 727 ms from start 10822: Second at 1227 ms from start 10833: First at 1328 ms from start 10843: Second at 1829 ms from start 1085``` 1086 1087<!--- TEST ARBITRARY_TIME --> 1088 1089### flatMapMerge 1090 1091Another flattening operation is to concurrently collect all the incoming flows and merge their values into 1092a single flow so that values are emitted as soon as possible. 1093It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional 1094`concurrency` parameter that limits the number of concurrent flows that are collected at the same time 1095(it is equal to [DEFAULT_CONCURRENCY] by default). 1096 1097```kotlin 1098import kotlinx.coroutines.* 1099import kotlinx.coroutines.flow.* 1100 1101fun requestFlow(i: Int): Flow<String> = flow { 1102 emit("$i: First") 1103 delay(500) // wait 500 ms 1104 emit("$i: Second") 1105} 1106 1107fun main() = runBlocking<Unit> { 1108//sampleStart 1109 val startTime = System.currentTimeMillis() // remember the start time 1110 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 1111 .flatMapMerge { requestFlow(it) } 1112 .collect { value -> // collect and print 1113 println("$value at ${System.currentTimeMillis() - startTime} ms from start") 1114 } 1115//sampleEnd 1116} 1117``` 1118{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1119 1120> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt). 1121> 1122{type="note"} 1123 1124The concurrent nature of [flatMapMerge] is obvious: 1125 1126```text 11271: First at 136 ms from start 11282: First at 231 ms from start 11293: First at 333 ms from start 11301: Second at 639 ms from start 11312: Second at 732 ms from start 11323: Second at 833 ms from start 1133``` 1134 1135<!--- TEST ARBITRARY_TIME --> 1136 1137> Note that the [flatMapMerge] calls its block of code (`{ requestFlow(it) }` in this example) sequentially, but 1138> collects the resulting flows concurrently, it is the equivalent of performing a sequential 1139> `map { requestFlow(it) }` first and then calling [flattenMerge] on the result. 1140> 1141{type="note"} 1142 1143### flatMapLatest 1144 1145In a similar way to the [collectLatest] operator, that was described in the section 1146["Processing the latest value"](#processing-the-latest-value), there is the corresponding "Latest" 1147flattening mode where the collection of the previous flow is cancelled as soon as new flow is emitted. 1148It is implemented by the [flatMapLatest] operator. 1149 1150```kotlin 1151import kotlinx.coroutines.* 1152import kotlinx.coroutines.flow.* 1153 1154fun requestFlow(i: Int): Flow<String> = flow { 1155 emit("$i: First") 1156 delay(500) // wait 500 ms 1157 emit("$i: Second") 1158} 1159 1160fun main() = runBlocking<Unit> { 1161//sampleStart 1162 val startTime = System.currentTimeMillis() // remember the start time 1163 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 1164 .flatMapLatest { requestFlow(it) } 1165 .collect { value -> // collect and print 1166 println("$value at ${System.currentTimeMillis() - startTime} ms from start") 1167 } 1168//sampleEnd 1169} 1170``` 1171{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1172 1173> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt). 1174> 1175{type="note"} 1176 1177The output here in this example is a good demonstration of how [flatMapLatest] works: 1178 1179```text 11801: First at 142 ms from start 11812: First at 322 ms from start 11823: First at 425 ms from start 11833: Second at 931 ms from start 1184``` 1185 1186<!--- TEST ARBITRARY_TIME --> 1187 1188> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) when a new value 1189> is received. 1190> It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending, 1191> and cannot be cancelled. However, a differnce in output would be visible if we were to use suspending functions 1192> like `delay` in `requestFlow`. 1193> 1194{type="note"} 1195 1196## Flow exceptions 1197 1198Flow collection can complete with an exception when an emitter or code inside the operators throw an exception. 1199There are several ways to handle these exceptions. 1200 1201### Collector try and catch 1202 1203A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions: 1204 1205```kotlin 1206import kotlinx.coroutines.* 1207import kotlinx.coroutines.flow.* 1208 1209//sampleStart 1210fun simple(): Flow<Int> = flow { 1211 for (i in 1..3) { 1212 println("Emitting $i") 1213 emit(i) // emit next value 1214 } 1215} 1216 1217fun main() = runBlocking<Unit> { 1218 try { 1219 simple().collect { value -> 1220 println(value) 1221 check(value <= 1) { "Collected $value" } 1222 } 1223 } catch (e: Throwable) { 1224 println("Caught $e") 1225 } 1226} 1227//sampleEnd 1228``` 1229{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1230 1231> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt). 1232> 1233{type="note"} 1234 1235This code successfully catches an exception in [collect] terminal operator and, 1236as we see, no more values are emitted after that: 1237 1238```text 1239Emitting 1 12401 1241Emitting 2 12422 1243Caught java.lang.IllegalStateException: Collected 2 1244``` 1245 1246<!--- TEST --> 1247 1248### Everything is caught 1249 1250The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators. 1251For example, let's change the code so that emitted values are [mapped][map] to strings, 1252but the corresponding code produces an exception: 1253 1254```kotlin 1255import kotlinx.coroutines.* 1256import kotlinx.coroutines.flow.* 1257 1258//sampleStart 1259fun simple(): Flow<String> = 1260 flow { 1261 for (i in 1..3) { 1262 println("Emitting $i") 1263 emit(i) // emit next value 1264 } 1265 } 1266 .map { value -> 1267 check(value <= 1) { "Crashed on $value" } 1268 "string $value" 1269 } 1270 1271fun main() = runBlocking<Unit> { 1272 try { 1273 simple().collect { value -> println(value) } 1274 } catch (e: Throwable) { 1275 println("Caught $e") 1276 } 1277} 1278//sampleEnd 1279``` 1280{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1281 1282> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt). 1283> 1284{type="note"} 1285 1286This exception is still caught and collection is stopped: 1287 1288```text 1289Emitting 1 1290string 1 1291Emitting 2 1292Caught java.lang.IllegalStateException: Crashed on 2 1293``` 1294 1295<!--- TEST --> 1296 1297## Exception transparency 1298 1299But how can code of the emitter encapsulate its exception handling behavior? 1300 1301Flows must be _transparent to exceptions_ and it is a violation of the exception transparency to [emit][FlowCollector.emit] values in the 1302`flow { ... }` builder from inside of a `try/catch` block. This guarantees that a collector throwing an exception 1303can always catch it using `try/catch` as in the previous example. 1304 1305The emitter can use a [catch] operator that preserves this exception transparency and allows encapsulation 1306of its exception handling. The body of the `catch` operator can analyze an exception 1307and react to it in different ways depending on which exception was caught: 1308 1309* Exceptions can be rethrown using `throw`. 1310* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch]. 1311* Exceptions can be ignored, logged, or processed by some other code. 1312 1313For example, let us emit the text on catching an exception: 1314 1315```kotlin 1316import kotlinx.coroutines.* 1317import kotlinx.coroutines.flow.* 1318 1319fun simple(): Flow<String> = 1320 flow { 1321 for (i in 1..3) { 1322 println("Emitting $i") 1323 emit(i) // emit next value 1324 } 1325 } 1326 .map { value -> 1327 check(value <= 1) { "Crashed on $value" } 1328 "string $value" 1329 } 1330 1331fun main() = runBlocking<Unit> { 1332//sampleStart 1333 simple() 1334 .catch { e -> emit("Caught $e") } // emit on exception 1335 .collect { value -> println(value) } 1336//sampleEnd 1337} 1338``` 1339{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1340 1341> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt). 1342> 1343{type="note"} 1344 1345The output of the example is the same, even though we do not have `try/catch` around the code anymore. 1346 1347<!--- TEST 1348Emitting 1 1349string 1 1350Emitting 2 1351Caught java.lang.IllegalStateException: Crashed on 2 1352--> 1353 1354### Transparent catch 1355 1356The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions 1357(that is an exception from all the operators above `catch`, but not below it). 1358If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes: 1359 1360```kotlin 1361import kotlinx.coroutines.* 1362import kotlinx.coroutines.flow.* 1363 1364//sampleStart 1365fun simple(): Flow<Int> = flow { 1366 for (i in 1..3) { 1367 println("Emitting $i") 1368 emit(i) 1369 } 1370} 1371 1372fun main() = runBlocking<Unit> { 1373 simple() 1374 .catch { e -> println("Caught $e") } // does not catch downstream exceptions 1375 .collect { value -> 1376 check(value <= 1) { "Collected $value" } 1377 println(value) 1378 } 1379} 1380//sampleEnd 1381``` 1382{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1383 1384> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt). 1385> 1386{type="note"} 1387 1388A "Caught ..." message is not printed despite there being a `catch` operator: 1389 1390```text 1391Emitting 1 13921 1393Emitting 2 1394Exception in thread "main" java.lang.IllegalStateException: Collected 2 1395 at ... 1396``` 1397 1398<!--- TEST EXCEPTION --> 1399 1400### Catching declaratively 1401 1402We can combine the declarative nature of the [catch] operator with a desire to handle all the exceptions, by moving the body 1403of the [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must 1404be triggered by a call to `collect()` without parameters: 1405 1406```kotlin 1407import kotlinx.coroutines.* 1408import kotlinx.coroutines.flow.* 1409 1410fun simple(): Flow<Int> = flow { 1411 for (i in 1..3) { 1412 println("Emitting $i") 1413 emit(i) 1414 } 1415} 1416 1417fun main() = runBlocking<Unit> { 1418//sampleStart 1419 simple() 1420 .onEach { value -> 1421 check(value <= 1) { "Collected $value" } 1422 println(value) 1423 } 1424 .catch { e -> println("Caught $e") } 1425 .collect() 1426//sampleEnd 1427} 1428``` 1429{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1430 1431> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt). 1432> 1433{type="note"} 1434 1435Now we can see that a "Caught ..." message is printed and so we can catch all the exceptions without explicitly 1436using a `try/catch` block: 1437 1438```text 1439Emitting 1 14401 1441Emitting 2 1442Caught java.lang.IllegalStateException: Collected 2 1443``` 1444 1445<!--- TEST EXCEPTION --> 1446 1447## Flow completion 1448 1449When flow collection completes (normally or exceptionally) it may need to execute an action. 1450As you may have already noticed, it can be done in two ways: imperative or declarative. 1451 1452### Imperative finally block 1453 1454In addition to `try`/`catch`, a collector can also use a `finally` block to execute an action 1455upon `collect` completion. 1456 1457```kotlin 1458import kotlinx.coroutines.* 1459import kotlinx.coroutines.flow.* 1460 1461//sampleStart 1462fun simple(): Flow<Int> = (1..3).asFlow() 1463 1464fun main() = runBlocking<Unit> { 1465 try { 1466 simple().collect { value -> println(value) } 1467 } finally { 1468 println("Done") 1469 } 1470} 1471//sampleEnd 1472``` 1473{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1474 1475> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt). 1476> 1477{type="note"} 1478 1479This code prints three numbers produced by the `simple` flow followed by a "Done" string: 1480 1481```text 14821 14832 14843 1485Done 1486``` 1487 1488<!--- TEST --> 1489 1490### Declarative handling 1491 1492For the declarative approach, flow has [onCompletion] intermediate operator that is invoked 1493when the flow has completely collected. 1494 1495The previous example can be rewritten using an [onCompletion] operator and produces the same output: 1496 1497```kotlin 1498import kotlinx.coroutines.* 1499import kotlinx.coroutines.flow.* 1500 1501fun simple(): Flow<Int> = (1..3).asFlow() 1502 1503fun main() = runBlocking<Unit> { 1504//sampleStart 1505 simple() 1506 .onCompletion { println("Done") } 1507 .collect { value -> println(value) } 1508//sampleEnd 1509} 1510``` 1511{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1512 1513> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt). 1514> 1515{type="note"} 1516 1517<!--- TEST 15181 15192 15203 1521Done 1522--> 1523 1524The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used 1525to determine whether the flow collection was completed normally or exceptionally. In the following 1526example the `simple` flow throws an exception after emitting the number 1: 1527 1528```kotlin 1529import kotlinx.coroutines.* 1530import kotlinx.coroutines.flow.* 1531 1532//sampleStart 1533fun simple(): Flow<Int> = flow { 1534 emit(1) 1535 throw RuntimeException() 1536} 1537 1538fun main() = runBlocking<Unit> { 1539 simple() 1540 .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") } 1541 .catch { cause -> println("Caught exception") } 1542 .collect { value -> println(value) } 1543} 1544//sampleEnd 1545``` 1546{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1547 1548> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt). 1549> 1550{type="note"} 1551 1552As you may expect, it prints: 1553 1554```text 15551 1556Flow completed exceptionally 1557Caught exception 1558``` 1559 1560<!--- TEST --> 1561 1562The [onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above 1563example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators 1564and can be handled with a `catch` operator. 1565 1566### Successful completion 1567 1568Another difference with [catch] operator is that [onCompletion] sees all exceptions and receives 1569a `null` exception only on successful completion of the upstream flow (without cancellation or failure). 1570 1571```kotlin 1572import kotlinx.coroutines.* 1573import kotlinx.coroutines.flow.* 1574 1575//sampleStart 1576fun simple(): Flow<Int> = (1..3).asFlow() 1577 1578fun main() = runBlocking<Unit> { 1579 simple() 1580 .onCompletion { cause -> println("Flow completed with $cause") } 1581 .collect { value -> 1582 check(value <= 1) { "Collected $value" } 1583 println(value) 1584 } 1585} 1586//sampleEnd 1587``` 1588{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1589 1590> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt). 1591> 1592{type="note"} 1593 1594We can see the completion cause is not null, because the flow was aborted due to downstream exception: 1595 1596```text 15971 1598Flow completed with java.lang.IllegalStateException: Collected 2 1599Exception in thread "main" java.lang.IllegalStateException: Collected 2 1600``` 1601 1602<!--- TEST EXCEPTION --> 1603 1604## Imperative versus declarative 1605 1606Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways. 1607The natural question here is, which approach is preferred and why? 1608As a library, we do not advocate for any particular approach and believe that both options 1609are valid and should be selected according to your own preferences and code style. 1610 1611## Launching flow 1612 1613It is easy to use flows to represent asynchronous events that are coming from some source. 1614In this case, we need an analogue of the `addEventListener` function that registers a piece of code with a reaction 1615for incoming events and continues further work. The [onEach] operator can serve this role. 1616However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow. 1617Otherwise, just calling `onEach` has no effect. 1618 1619If we use the [collect] terminal operator after `onEach`, then the code after it will wait until the flow is collected: 1620 1621```kotlin 1622import kotlinx.coroutines.* 1623import kotlinx.coroutines.flow.* 1624 1625//sampleStart 1626// Imitate a flow of events 1627fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) } 1628 1629fun main() = runBlocking<Unit> { 1630 events() 1631 .onEach { event -> println("Event: $event") } 1632 .collect() // <--- Collecting the flow waits 1633 println("Done") 1634} 1635//sampleEnd 1636``` 1637{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1638 1639> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt). 1640> 1641{type="note"} 1642 1643As you can see, it prints: 1644 1645```text 1646Event: 1 1647Event: 2 1648Event: 3 1649Done 1650``` 1651 1652<!--- TEST --> 1653 1654The [launchIn] terminal operator comes in handy here. By replacing `collect` with `launchIn` we can 1655launch a collection of the flow in a separate coroutine, so that execution of further code 1656immediately continues: 1657 1658```kotlin 1659import kotlinx.coroutines.* 1660import kotlinx.coroutines.flow.* 1661 1662// Imitate a flow of events 1663fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) } 1664 1665//sampleStart 1666fun main() = runBlocking<Unit> { 1667 events() 1668 .onEach { event -> println("Event: $event") } 1669 .launchIn(this) // <--- Launching the flow in a separate coroutine 1670 println("Done") 1671} 1672//sampleEnd 1673``` 1674{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1675 1676> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt). 1677> 1678{type="note"} 1679 1680It prints: 1681 1682```text 1683Done 1684Event: 1 1685Event: 2 1686Event: 3 1687``` 1688 1689<!--- TEST --> 1690 1691The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is 1692launched. In the above example this scope comes from the [runBlocking] 1693coroutine builder, so while the flow is running, this [runBlocking] scope waits for completion of its child coroutine 1694and keeps the main function from returning and terminating this example. 1695 1696In actual applications a scope will come from an entity with a limited 1697lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling 1698the collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works 1699like the `addEventListener`. However, there is no need for the corresponding `removeEventListener` function, 1700as cancellation and structured concurrency serve this purpose. 1701 1702Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection 1703coroutine only without cancelling the whole scope or to [join][Job.join] it. 1704 1705### Flow cancellation checks 1706 1707For convenience, the [flow][_flow] builder performs additional [ensureActive] checks for cancellation on each emitted value. 1708It means that a busy loop emitting from a `flow { ... }` is cancellable: 1709 1710```kotlin 1711import kotlinx.coroutines.* 1712import kotlinx.coroutines.flow.* 1713 1714//sampleStart 1715fun foo(): Flow<Int> = flow { 1716 for (i in 1..5) { 1717 println("Emitting $i") 1718 emit(i) 1719 } 1720} 1721 1722fun main() = runBlocking<Unit> { 1723 foo().collect { value -> 1724 if (value == 3) cancel() 1725 println(value) 1726 } 1727} 1728//sampleEnd 1729``` 1730{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1731 1732> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt). 1733> 1734{type="note"} 1735 1736We get only numbers up to 3 and a [CancellationException] after trying to emit number 4: 1737 1738```text 1739Emitting 1 17401 1741Emitting 2 17422 1743Emitting 3 17443 1745Emitting 4 1746Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c 1747``` 1748 1749<!--- TEST EXCEPTION --> 1750 1751However, most other flow operators do not do additional cancellation checks on their own for performance reasons. 1752For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere, 1753then there are no checks for cancellation: 1754 1755```kotlin 1756import kotlinx.coroutines.* 1757import kotlinx.coroutines.flow.* 1758 1759//sampleStart 1760fun main() = runBlocking<Unit> { 1761 (1..5).asFlow().collect { value -> 1762 if (value == 3) cancel() 1763 println(value) 1764 } 1765} 1766//sampleEnd 1767``` 1768{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1769 1770> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt). 1771> 1772{type="note"} 1773 1774All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`: 1775 1776```text 17771 17782 17793 17804 17815 1782Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23 1783``` 1784 1785<!--- TEST EXCEPTION --> 1786 1787#### Making busy flow cancellable 1788 1789In the case where you have a busy loop with coroutines you must explicitly check for cancellation. 1790You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use 1791[cancellable] operator provided to do that: 1792 1793```kotlin 1794import kotlinx.coroutines.* 1795import kotlinx.coroutines.flow.* 1796 1797//sampleStart 1798fun main() = runBlocking<Unit> { 1799 (1..5).asFlow().cancellable().collect { value -> 1800 if (value == 3) cancel() 1801 println(value) 1802 } 1803} 1804//sampleEnd 1805``` 1806{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 1807 1808> You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt). 1809> 1810{type="note"} 1811 1812With the `cancellable` operator only the numbers from 1 to 3 are collected: 1813 1814```text 18151 18162 18173 1818Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365 1819``` 1820 1821<!--- TEST EXCEPTION --> 1822 1823## Flow and Reactive Streams 1824 1825For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor, 1826design of the Flow may look very familiar. 1827 1828Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, 1829be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article. 1830 1831While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa. 1832Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2`/`kotlinx-coroutines-rx3` for RxJava2/RxJava3). 1833Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities. 1834 1835<!-- stdlib references --> 1836 1837[collections]: https://kotlinlang.org/docs/reference/collections-overview.html 1838[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/ 1839[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html 1840[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/ 1841[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html 1842[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html 1843[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html 1844[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html 1845 1846<!--- MODULE kotlinx-coroutines-core --> 1847<!--- INDEX kotlinx.coroutines --> 1848 1849[delay]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html 1850[withTimeoutOrNull]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html 1851[Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html 1852[Dispatchers.Main]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html 1853[withContext]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html 1854[CoroutineDispatcher]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html 1855[CoroutineScope]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html 1856[runBlocking]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html 1857[Job]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html 1858[Job.cancel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/cancel.html 1859[Job.join]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html 1860[ensureActive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html 1861[CancellationException]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html 1862 1863<!--- INDEX kotlinx.coroutines.flow --> 1864 1865[Flow]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html 1866[_flow]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html 1867[FlowCollector.emit]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html 1868[collect]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html 1869[flowOf]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html 1870[map]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html 1871[filter]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html 1872[transform]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html 1873[take]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html 1874[toList]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html 1875[toSet]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html 1876[first]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html 1877[single]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html 1878[reduce]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html 1879[fold]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html 1880[flowOn]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html 1881[buffer]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html 1882[conflate]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html 1883[collectLatest]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html 1884[zip]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html 1885[combine]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html 1886[onEach]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html 1887[flatMapConcat]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html 1888[flattenConcat]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html 1889[flatMapMerge]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html 1890[flattenMerge]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html 1891[DEFAULT_CONCURRENCY]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html 1892[flatMapLatest]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html 1893[catch]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html 1894[onCompletion]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html 1895[launchIn]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html 1896[IntRange.asFlow]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/as-flow.html 1897[cancellable]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html 1898 1899<!--- END --> 1900