1<!--- TEST_NAME ChannelsGuideTest --> 2 3[//]: # (title: Channels) 4 5Deferred values provide a convenient way to transfer a single value between coroutines. 6Channels provide a way to transfer a stream of values. 7 8## Channel basics 9 10A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that 11instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of 12a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive]. 13 14```kotlin 15import kotlinx.coroutines.* 16import kotlinx.coroutines.channels.* 17 18fun main() = runBlocking { 19//sampleStart 20 val channel = Channel<Int>() 21 launch { 22 // this might be heavy CPU-consuming computation or async logic, 23 // we'll just send five squares 24 for (x in 1..5) channel.send(x * x) 25 } 26 // here we print five received integers: 27 repeat(5) { println(channel.receive()) } 28 println("Done!") 29//sampleEnd 30} 31``` 32{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 33 34> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt). 35> 36{type="note"} 37 38The output of this code is: 39 40```text 411 424 439 4416 4525 46Done! 47``` 48 49<!--- TEST --> 50 51## Closing and iteration over channels 52 53Unlike a queue, a channel can be closed to indicate that no more elements are coming. 54On the receiver side it is convenient to use a regular `for` loop to receive elements 55from the channel. 56 57Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel. 58The iteration stops as soon as this close token is received, so there is a guarantee 59that all previously sent elements before the close are received: 60 61```kotlin 62import kotlinx.coroutines.* 63import kotlinx.coroutines.channels.* 64 65fun main() = runBlocking { 66//sampleStart 67 val channel = Channel<Int>() 68 launch { 69 for (x in 1..5) channel.send(x * x) 70 channel.close() // we're done sending 71 } 72 // here we print received values using `for` loop (until the channel is closed) 73 for (y in channel) println(y) 74 println("Done!") 75//sampleEnd 76} 77``` 78{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 79 80> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt). 81> 82{type="note"} 83 84<!--- TEST 851 864 879 8816 8925 90Done! 91--> 92 93## Building channel producers 94 95The pattern where a coroutine is producing a sequence of elements is quite common. 96This is a part of _producer-consumer_ pattern that is often found in concurrent code. 97You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary 98to common sense that results must be returned from functions. 99 100There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side, 101and an extension function [consumeEach], that replaces a `for` loop on the consumer side: 102 103```kotlin 104import kotlinx.coroutines.* 105import kotlinx.coroutines.channels.* 106 107//sampleStart 108fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { 109 for (x in 1..5) send(x * x) 110} 111 112fun main() = runBlocking { 113 val squares = produceSquares() 114 squares.consumeEach { println(it) } 115 println("Done!") 116//sampleEnd 117} 118``` 119{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 120 121> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt). 122> 123{type="note"} 124 125<!--- TEST 1261 1274 1289 12916 13025 131Done! 132--> 133 134## Pipelines 135 136A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values: 137 138```kotlin 139fun CoroutineScope.produceNumbers() = produce<Int> { 140 var x = 1 141 while (true) send(x++) // infinite stream of integers starting from 1 142} 143``` 144 145And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results. 146In the example below, the numbers are just squared: 147 148```kotlin 149fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { 150 for (x in numbers) send(x * x) 151} 152``` 153 154The main code starts and connects the whole pipeline: 155 156<!--- CLEAR --> 157 158```kotlin 159import kotlinx.coroutines.* 160import kotlinx.coroutines.channels.* 161 162fun main() = runBlocking { 163//sampleStart 164 val numbers = produceNumbers() // produces integers from 1 and on 165 val squares = square(numbers) // squares integers 166 repeat(5) { 167 println(squares.receive()) // print first five 168 } 169 println("Done!") // we are done 170 coroutineContext.cancelChildren() // cancel children coroutines 171//sampleEnd 172} 173 174fun CoroutineScope.produceNumbers() = produce<Int> { 175 var x = 1 176 while (true) send(x++) // infinite stream of integers starting from 1 177} 178 179fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { 180 for (x in numbers) send(x * x) 181} 182``` 183{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 184 185> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt). 186> 187{type="note"} 188 189<!--- TEST 1901 1914 1929 19316 19425 195Done! 196--> 197 198> All functions that create coroutines are defined as extensions on [CoroutineScope], 199> so that we can rely on [structured concurrency](composing-suspending-functions.md#structured-concurrency-with-async) to make 200> sure that we don't have lingering global coroutines in our application. 201> 202{type="note"} 203 204## Prime numbers with pipeline 205 206Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline 207of coroutines. We start with an infinite sequence of numbers. 208 209```kotlin 210fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { 211 var x = start 212 while (true) send(x++) // infinite stream of integers from start 213} 214``` 215 216The following pipeline stage filters an incoming stream of numbers, removing all the numbers 217that are divisible by the given prime number: 218 219```kotlin 220fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { 221 for (x in numbers) if (x % prime != 0) send(x) 222} 223``` 224 225Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel, 226and launching new pipeline stage for each prime number found: 227 228```Plain Text 229numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... 230``` 231 232The following example prints the first ten prime numbers, 233running the whole pipeline in the context of the main thread. Since all the coroutines are launched in 234the scope of the main [runBlocking] coroutine 235we don't have to keep an explicit list of all the coroutines we have started. 236We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren] 237extension function to cancel all the children coroutines after we have printed 238the first ten prime numbers. 239 240<!--- CLEAR --> 241 242```kotlin 243import kotlinx.coroutines.* 244import kotlinx.coroutines.channels.* 245 246fun main() = runBlocking { 247//sampleStart 248 var cur = numbersFrom(2) 249 repeat(10) { 250 val prime = cur.receive() 251 println(prime) 252 cur = filter(cur, prime) 253 } 254 coroutineContext.cancelChildren() // cancel all children to let main finish 255//sampleEnd 256} 257 258fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { 259 var x = start 260 while (true) send(x++) // infinite stream of integers from start 261} 262 263fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { 264 for (x in numbers) if (x % prime != 0) send(x) 265} 266``` 267{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 268 269> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt). 270> 271{type="note"} 272 273The output of this code is: 274 275```text 2762 2773 2785 2797 28011 28113 28217 28319 28423 28529 286``` 287 288<!--- TEST --> 289 290Note that you can build the same pipeline using 291[`iterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/iterator.html) 292coroutine builder from the standard library. 293Replace `produce` with `iterator`, `send` with `yield`, `receive` with `next`, 294`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either. 295However, the benefit of a pipeline that uses channels as shown above is that it can actually use 296multiple CPU cores if you run it in [Dispatchers.Default] context. 297 298Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some 299other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be 300built using `sequence`/`iterator`, because they do not allow arbitrary suspension, unlike 301`produce`, which is fully asynchronous. 302 303## Fan-out 304 305Multiple coroutines may receive from the same channel, distributing work between themselves. 306Let us start with a producer coroutine that is periodically producing integers 307(ten numbers per second): 308 309```kotlin 310fun CoroutineScope.produceNumbers() = produce<Int> { 311 var x = 1 // start from 1 312 while (true) { 313 send(x++) // produce next 314 delay(100) // wait 0.1s 315 } 316} 317``` 318 319Then we can have several processor coroutines. In this example, they just print their id and 320received number: 321 322```kotlin 323fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { 324 for (msg in channel) { 325 println("Processor #$id received $msg") 326 } 327} 328``` 329 330Now let us launch five processors and let them work for almost a second. See what happens: 331 332<!--- CLEAR --> 333 334```kotlin 335import kotlinx.coroutines.* 336import kotlinx.coroutines.channels.* 337 338fun main() = runBlocking<Unit> { 339//sampleStart 340 val producer = produceNumbers() 341 repeat(5) { launchProcessor(it, producer) } 342 delay(950) 343 producer.cancel() // cancel producer coroutine and thus kill them all 344//sampleEnd 345} 346 347fun CoroutineScope.produceNumbers() = produce<Int> { 348 var x = 1 // start from 1 349 while (true) { 350 send(x++) // produce next 351 delay(100) // wait 0.1s 352 } 353} 354 355fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { 356 for (msg in channel) { 357 println("Processor #$id received $msg") 358 } 359} 360``` 361{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 362 363> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt). 364> 365{type="note"} 366 367The output will be similar to the the following one, albeit the processor ids that receive 368each specific integer may be different: 369 370```text 371Processor #2 received 1 372Processor #4 received 2 373Processor #0 received 3 374Processor #1 received 4 375Processor #3 received 5 376Processor #2 received 6 377Processor #4 received 7 378Processor #0 received 8 379Processor #1 received 9 380Processor #3 received 10 381``` 382 383<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } --> 384 385Note that cancelling a producer coroutine closes its channel, thus eventually terminating iteration 386over the channel that processor coroutines are doing. 387 388Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code. 389Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor 390coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach` 391always consumes (cancels) the underlying channel on its normal or abnormal completion. 392 393## Fan-in 394 395Multiple coroutines may send to the same channel. 396For example, let us have a channel of strings, and a suspending function that 397repeatedly sends a specified string to this channel with a specified delay: 398 399```kotlin 400suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { 401 while (true) { 402 delay(time) 403 channel.send(s) 404 } 405} 406``` 407 408Now, let us see what happens if we launch a couple of coroutines sending strings 409(in this example we launch them in the context of the main thread as main coroutine's children): 410 411<!--- CLEAR --> 412 413```kotlin 414import kotlinx.coroutines.* 415import kotlinx.coroutines.channels.* 416 417fun main() = runBlocking { 418//sampleStart 419 val channel = Channel<String>() 420 launch { sendString(channel, "foo", 200L) } 421 launch { sendString(channel, "BAR!", 500L) } 422 repeat(6) { // receive first six 423 println(channel.receive()) 424 } 425 coroutineContext.cancelChildren() // cancel all children to let main finish 426//sampleEnd 427} 428 429suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { 430 while (true) { 431 delay(time) 432 channel.send(s) 433 } 434} 435``` 436{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 437 438> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt). 439> 440{type="note"} 441 442The output is: 443 444```text 445foo 446foo 447BAR! 448foo 449foo 450BAR! 451``` 452 453<!--- TEST --> 454 455## Buffered channels 456 457The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver 458meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked, 459if receive is invoked first, it is suspended until send is invoked. 460 461Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to 462specify _buffer size_. Buffer allows senders to send multiple elements before suspending, 463similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full. 464 465Take a look at the behavior of the following code: 466 467```kotlin 468import kotlinx.coroutines.* 469import kotlinx.coroutines.channels.* 470 471fun main() = runBlocking<Unit> { 472//sampleStart 473 val channel = Channel<Int>(4) // create buffered channel 474 val sender = launch { // launch sender coroutine 475 repeat(10) { 476 println("Sending $it") // print before sending each element 477 channel.send(it) // will suspend when buffer is full 478 } 479 } 480 // don't receive anything... just wait.... 481 delay(1000) 482 sender.cancel() // cancel sender coroutine 483//sampleEnd 484} 485``` 486{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 487 488> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt). 489> 490{type="note"} 491 492It prints "sending" _five_ times using a buffered channel with capacity of _four_: 493 494```text 495Sending 0 496Sending 1 497Sending 2 498Sending 3 499Sending 4 500``` 501 502<!--- TEST --> 503 504The first four elements are added to the buffer and the sender suspends when trying to send the fifth one. 505 506## Channels are fair 507 508Send and receive operations to channels are _fair_ with respect to the order of their invocation from 509multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive` 510gets the element. In the following example two coroutines "ping" and "pong" are 511receiving the "ball" object from the shared "table" channel. 512 513```kotlin 514import kotlinx.coroutines.* 515import kotlinx.coroutines.channels.* 516 517//sampleStart 518data class Ball(var hits: Int) 519 520fun main() = runBlocking { 521 val table = Channel<Ball>() // a shared table 522 launch { player("ping", table) } 523 launch { player("pong", table) } 524 table.send(Ball(0)) // serve the ball 525 delay(1000) // delay 1 second 526 coroutineContext.cancelChildren() // game over, cancel them 527} 528 529suspend fun player(name: String, table: Channel<Ball>) { 530 for (ball in table) { // receive the ball in a loop 531 ball.hits++ 532 println("$name $ball") 533 delay(300) // wait a bit 534 table.send(ball) // send the ball back 535 } 536} 537//sampleEnd 538``` 539{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 540 541> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt). 542> 543{type="note"} 544 545The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping" 546coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets 547received by the "pong" coroutine, because it was already waiting for it: 548 549```text 550ping Ball(hits=1) 551pong Ball(hits=2) 552ping Ball(hits=3) 553pong Ball(hits=4) 554``` 555 556<!--- TEST --> 557 558Note that sometimes channels may produce executions that look unfair due to the nature of the executor 559that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details. 560 561## Ticker channels 562 563Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel. 564Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce] 565pipelines and operators that do windowing and other time-dependent processing. 566Ticker channel can be used in [select] to perform "on tick" action. 567 568To create such channel use a factory method [ticker]. 569To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it. 570 571Now let's see how it works in practice: 572 573```kotlin 574import kotlinx.coroutines.* 575import kotlinx.coroutines.channels.* 576 577//sampleStart 578fun main() = runBlocking<Unit> { 579 val tickerChannel = ticker(delayMillis = 200, initialDelayMillis = 0) // create a ticker channel 580 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } 581 println("Initial element is available immediately: $nextElement") // no initial delay 582 583 nextElement = withTimeoutOrNull(100) { tickerChannel.receive() } // all subsequent elements have 200ms delay 584 println("Next element is not ready in 100 ms: $nextElement") 585 586 nextElement = withTimeoutOrNull(120) { tickerChannel.receive() } 587 println("Next element is ready in 200 ms: $nextElement") 588 589 // Emulate large consumption delays 590 println("Consumer pauses for 300ms") 591 delay(300) 592 // Next element is available immediately 593 nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } 594 println("Next element is available immediately after large consumer delay: $nextElement") 595 // Note that the pause between `receive` calls is taken into account and next element arrives faster 596 nextElement = withTimeoutOrNull(120) { tickerChannel.receive() } 597 println("Next element is ready in 100ms after consumer pause in 300ms: $nextElement") 598 599 tickerChannel.cancel() // indicate that no more elements are needed 600} 601//sampleEnd 602``` 603{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 604 605> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt). 606> 607{type="note"} 608 609It prints following lines: 610 611```text 612Initial element is available immediately: kotlin.Unit 613Next element is not ready in 100 ms: null 614Next element is ready in 200 ms: kotlin.Unit 615Consumer pauses for 300ms 616Next element is available immediately after large consumer delay: kotlin.Unit 617Next element is ready in 100ms after consumer pause in 300ms: kotlin.Unit 618``` 619 620<!--- TEST --> 621 622Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element 623delay if a pause occurs, trying to maintain a fixed rate of produced elements. 624 625Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed 626delay between elements. 627 628<!--- MODULE kotlinx-coroutines-core --> 629<!--- INDEX kotlinx.coroutines --> 630 631[CoroutineScope]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html 632[runBlocking]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html 633[kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/cancel-children.html 634[Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html 635 636<!--- INDEX kotlinx.coroutines.channels --> 637 638[Channel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html 639[SendChannel.send]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html 640[ReceiveChannel.receive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html 641[SendChannel.close]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html 642[produce]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html 643[consumeEach]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html 644[Channel()]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html 645[ticker]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html 646[ReceiveChannel.cancel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html 647[TickerMode.FIXED_DELAY]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y/index.html 648 649<!--- INDEX kotlinx.coroutines.selects --> 650 651[select]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html 652 653<!--- END --> 654