xref: /aosp_15_r20/external/kotlinx.coroutines/docs/topics/channels.md (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)
1<!--- TEST_NAME ChannelsGuideTest -->
2
3[//]: # (title: Channels)
4
5Deferred values provide a convenient way to transfer a single value between coroutines.
6Channels provide a way to transfer a stream of values.
7
8## Channel basics
9
10A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that
11instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of
12a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive].
13
14```kotlin
15import kotlinx.coroutines.*
16import kotlinx.coroutines.channels.*
17
18fun main() = runBlocking {
19//sampleStart
20    val channel = Channel<Int>()
21    launch {
22        // this might be heavy CPU-consuming computation or async logic,
23        // we'll just send five squares
24        for (x in 1..5) channel.send(x * x)
25    }
26    // here we print five received integers:
27    repeat(5) { println(channel.receive()) }
28    println("Done!")
29//sampleEnd
30}
31```
32{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
33
34> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt).
35>
36{type="note"}
37
38The output of this code is:
39
40```text
411
424
439
4416
4525
46Done!
47```
48
49<!--- TEST -->
50
51## Closing and iteration over channels
52
53Unlike a queue, a channel can be closed to indicate that no more elements are coming.
54On the receiver side it is convenient to use a regular `for` loop to receive elements
55from the channel.
56
57Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel.
58The iteration stops as soon as this close token is received, so there is a guarantee
59that all previously sent elements before the close are received:
60
61```kotlin
62import kotlinx.coroutines.*
63import kotlinx.coroutines.channels.*
64
65fun main() = runBlocking {
66//sampleStart
67    val channel = Channel<Int>()
68    launch {
69        for (x in 1..5) channel.send(x * x)
70        channel.close() // we're done sending
71    }
72    // here we print received values using `for` loop (until the channel is closed)
73    for (y in channel) println(y)
74    println("Done!")
75//sampleEnd
76}
77```
78{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
79
80> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt).
81>
82{type="note"}
83
84<!--- TEST
851
864
879
8816
8925
90Done!
91-->
92
93## Building channel producers
94
95The pattern where a coroutine is producing a sequence of elements is quite common.
96This is a part of _producer-consumer_ pattern that is often found in concurrent code.
97You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary
98to common sense that results must be returned from functions.
99
100There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side,
101and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
102
103```kotlin
104import kotlinx.coroutines.*
105import kotlinx.coroutines.channels.*
106
107//sampleStart
108fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
109    for (x in 1..5) send(x * x)
110}
111
112fun main() = runBlocking {
113    val squares = produceSquares()
114    squares.consumeEach { println(it) }
115    println("Done!")
116//sampleEnd
117}
118```
119{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
120
121> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt).
122>
123{type="note"}
124
125<!--- TEST
1261
1274
1289
12916
13025
131Done!
132-->
133
134## Pipelines
135
136A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
137
138```kotlin
139fun CoroutineScope.produceNumbers() = produce<Int> {
140    var x = 1
141    while (true) send(x++) // infinite stream of integers starting from 1
142}
143```
144
145And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results.
146In the example below, the numbers are just squared:
147
148```kotlin
149fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
150    for (x in numbers) send(x * x)
151}
152```
153
154The main code starts and connects the whole pipeline:
155
156<!--- CLEAR -->
157
158```kotlin
159import kotlinx.coroutines.*
160import kotlinx.coroutines.channels.*
161
162fun main() = runBlocking {
163//sampleStart
164    val numbers = produceNumbers() // produces integers from 1 and on
165    val squares = square(numbers) // squares integers
166    repeat(5) {
167        println(squares.receive()) // print first five
168    }
169    println("Done!") // we are done
170    coroutineContext.cancelChildren() // cancel children coroutines
171//sampleEnd
172}
173
174fun CoroutineScope.produceNumbers() = produce<Int> {
175    var x = 1
176    while (true) send(x++) // infinite stream of integers starting from 1
177}
178
179fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
180    for (x in numbers) send(x * x)
181}
182```
183{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
184
185> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt).
186>
187{type="note"}
188
189<!--- TEST
1901
1914
1929
19316
19425
195Done!
196-->
197
198> All functions that create coroutines are defined as extensions on [CoroutineScope],
199> so that we can rely on [structured concurrency](composing-suspending-functions.md#structured-concurrency-with-async) to make
200> sure that we don't have lingering global coroutines in our application.
201>
202{type="note"}
203
204## Prime numbers with pipeline
205
206Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline
207of coroutines. We start with an infinite sequence of numbers.
208
209```kotlin
210fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
211    var x = start
212    while (true) send(x++) // infinite stream of integers from start
213}
214```
215
216The following pipeline stage filters an incoming stream of numbers, removing all the numbers
217that are divisible by the given prime number:
218
219```kotlin
220fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
221    for (x in numbers) if (x % prime != 0) send(x)
222}
223```
224
225Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel,
226and launching new pipeline stage for each prime number found:
227
228```Plain Text
229numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
230```
231
232The following example prints the first ten prime numbers,
233running the whole pipeline in the context of the main thread. Since all the coroutines are launched in
234the scope of the main [runBlocking] coroutine
235we don't have to keep an explicit list of all the coroutines we have started.
236We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren]
237extension function to cancel all the children coroutines after we have printed
238the first ten prime numbers.
239
240<!--- CLEAR -->
241
242```kotlin
243import kotlinx.coroutines.*
244import kotlinx.coroutines.channels.*
245
246fun main() = runBlocking {
247//sampleStart
248    var cur = numbersFrom(2)
249    repeat(10) {
250        val prime = cur.receive()
251        println(prime)
252        cur = filter(cur, prime)
253    }
254    coroutineContext.cancelChildren() // cancel all children to let main finish
255//sampleEnd
256}
257
258fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
259    var x = start
260    while (true) send(x++) // infinite stream of integers from start
261}
262
263fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
264    for (x in numbers) if (x % prime != 0) send(x)
265}
266```
267{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
268
269> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt).
270>
271{type="note"}
272
273The output of this code is:
274
275```text
2762
2773
2785
2797
28011
28113
28217
28319
28423
28529
286```
287
288<!--- TEST -->
289
290Note that you can build the same pipeline using
291[`iterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/iterator.html)
292coroutine builder from the standard library.
293Replace `produce` with `iterator`, `send` with `yield`, `receive` with `next`,
294`ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either.
295However, the benefit of a pipeline that uses channels as shown above is that it can actually use
296multiple CPU cores if you run it in [Dispatchers.Default] context.
297
298Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
299other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
300built using `sequence`/`iterator`, because they do not allow arbitrary suspension, unlike
301`produce`, which is fully asynchronous.
302
303## Fan-out
304
305Multiple coroutines may receive from the same channel, distributing work between themselves.
306Let us start with a producer coroutine that is periodically producing integers
307(ten numbers per second):
308
309```kotlin
310fun CoroutineScope.produceNumbers() = produce<Int> {
311    var x = 1 // start from 1
312    while (true) {
313        send(x++) // produce next
314        delay(100) // wait 0.1s
315    }
316}
317```
318
319Then we can have several processor coroutines. In this example, they just print their id and
320received number:
321
322```kotlin
323fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
324    for (msg in channel) {
325        println("Processor #$id received $msg")
326    }
327}
328```
329
330Now let us launch five processors and let them work for almost a second. See what happens:
331
332<!--- CLEAR -->
333
334```kotlin
335import kotlinx.coroutines.*
336import kotlinx.coroutines.channels.*
337
338fun main() = runBlocking<Unit> {
339//sampleStart
340    val producer = produceNumbers()
341    repeat(5) { launchProcessor(it, producer) }
342    delay(950)
343    producer.cancel() // cancel producer coroutine and thus kill them all
344//sampleEnd
345}
346
347fun CoroutineScope.produceNumbers() = produce<Int> {
348    var x = 1 // start from 1
349    while (true) {
350        send(x++) // produce next
351        delay(100) // wait 0.1s
352    }
353}
354
355fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
356    for (msg in channel) {
357        println("Processor #$id received $msg")
358    }
359}
360```
361{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
362
363> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt).
364>
365{type="note"}
366
367The output will be similar to the the following one, albeit the processor ids that receive
368each specific integer may be different:
369
370```text
371Processor #2 received 1
372Processor #4 received 2
373Processor #0 received 3
374Processor #1 received 4
375Processor #3 received 5
376Processor #2 received 6
377Processor #4 received 7
378Processor #0 received 8
379Processor #1 received 9
380Processor #3 received 10
381```
382
383<!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } -->
384
385Note that cancelling a producer coroutine closes its channel, thus eventually terminating iteration
386over the channel that processor coroutines are doing.
387
388Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code.
389Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor
390coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach`
391always consumes (cancels) the underlying channel on its normal or abnormal completion.
392
393## Fan-in
394
395Multiple coroutines may send to the same channel.
396For example, let us have a channel of strings, and a suspending function that
397repeatedly sends a specified string to this channel with a specified delay:
398
399```kotlin
400suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
401    while (true) {
402        delay(time)
403        channel.send(s)
404    }
405}
406```
407
408Now, let us see what happens if we launch a couple of coroutines sending strings
409(in this example we launch them in the context of the main thread as main coroutine's children):
410
411<!--- CLEAR -->
412
413```kotlin
414import kotlinx.coroutines.*
415import kotlinx.coroutines.channels.*
416
417fun main() = runBlocking {
418//sampleStart
419    val channel = Channel<String>()
420    launch { sendString(channel, "foo", 200L) }
421    launch { sendString(channel, "BAR!", 500L) }
422    repeat(6) { // receive first six
423        println(channel.receive())
424    }
425    coroutineContext.cancelChildren() // cancel all children to let main finish
426//sampleEnd
427}
428
429suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
430    while (true) {
431        delay(time)
432        channel.send(s)
433    }
434}
435```
436{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
437
438> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt).
439>
440{type="note"}
441
442The output is:
443
444```text
445foo
446foo
447BAR!
448foo
449foo
450BAR!
451```
452
453<!--- TEST -->
454
455## Buffered channels
456
457The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver
458meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked,
459if receive is invoked first, it is suspended until send is invoked.
460
461Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to
462specify _buffer size_. Buffer allows senders to send multiple elements before suspending,
463similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full.
464
465Take a look at the behavior of the following code:
466
467```kotlin
468import kotlinx.coroutines.*
469import kotlinx.coroutines.channels.*
470
471fun main() = runBlocking<Unit> {
472//sampleStart
473    val channel = Channel<Int>(4) // create buffered channel
474    val sender = launch { // launch sender coroutine
475        repeat(10) {
476            println("Sending $it") // print before sending each element
477            channel.send(it) // will suspend when buffer is full
478        }
479    }
480    // don't receive anything... just wait....
481    delay(1000)
482    sender.cancel() // cancel sender coroutine
483//sampleEnd
484}
485```
486{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
487
488> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt).
489>
490{type="note"}
491
492It prints "sending" _five_ times using a buffered channel with capacity of _four_:
493
494```text
495Sending 0
496Sending 1
497Sending 2
498Sending 3
499Sending 4
500```
501
502<!--- TEST -->
503
504The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
505
506## Channels are fair
507
508Send and receive operations to channels are _fair_ with respect to the order of their invocation from
509multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive`
510gets the element. In the following example two coroutines "ping" and "pong" are
511receiving the "ball" object from the shared "table" channel.
512
513```kotlin
514import kotlinx.coroutines.*
515import kotlinx.coroutines.channels.*
516
517//sampleStart
518data class Ball(var hits: Int)
519
520fun main() = runBlocking {
521    val table = Channel<Ball>() // a shared table
522    launch { player("ping", table) }
523    launch { player("pong", table) }
524    table.send(Ball(0)) // serve the ball
525    delay(1000) // delay 1 second
526    coroutineContext.cancelChildren() // game over, cancel them
527}
528
529suspend fun player(name: String, table: Channel<Ball>) {
530    for (ball in table) { // receive the ball in a loop
531        ball.hits++
532        println("$name $ball")
533        delay(300) // wait a bit
534        table.send(ball) // send the ball back
535    }
536}
537//sampleEnd
538```
539{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
540
541> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt).
542>
543{type="note"}
544
545The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping"
546coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets
547received by the "pong" coroutine, because it was already waiting for it:
548
549```text
550ping Ball(hits=1)
551pong Ball(hits=2)
552ping Ball(hits=3)
553pong Ball(hits=4)
554```
555
556<!--- TEST -->
557
558Note that sometimes channels may produce executions that look unfair due to the nature of the executor
559that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
560
561## Ticker channels
562
563Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel.
564Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce]
565pipelines and operators that do windowing and other time-dependent processing.
566Ticker channel can be used in [select] to perform "on tick" action.
567
568To create such channel use a factory method [ticker].
569To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
570
571Now let's see how it works in practice:
572
573```kotlin
574import kotlinx.coroutines.*
575import kotlinx.coroutines.channels.*
576
577//sampleStart
578fun main() = runBlocking<Unit> {
579    val tickerChannel = ticker(delayMillis = 200, initialDelayMillis = 0) // create a ticker channel
580    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
581    println("Initial element is available immediately: $nextElement") // no initial delay
582
583    nextElement = withTimeoutOrNull(100) { tickerChannel.receive() } // all subsequent elements have 200ms delay
584    println("Next element is not ready in 100 ms: $nextElement")
585
586    nextElement = withTimeoutOrNull(120) { tickerChannel.receive() }
587    println("Next element is ready in 200 ms: $nextElement")
588
589    // Emulate large consumption delays
590    println("Consumer pauses for 300ms")
591    delay(300)
592    // Next element is available immediately
593    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
594    println("Next element is available immediately after large consumer delay: $nextElement")
595    // Note that the pause between `receive` calls is taken into account and next element arrives faster
596    nextElement = withTimeoutOrNull(120) { tickerChannel.receive() }
597    println("Next element is ready in 100ms after consumer pause in 300ms: $nextElement")
598
599    tickerChannel.cancel() // indicate that no more elements are needed
600}
601//sampleEnd
602```
603{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
604
605> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt).
606>
607{type="note"}
608
609It prints following lines:
610
611```text
612Initial element is available immediately: kotlin.Unit
613Next element is not ready in 100 ms: null
614Next element is ready in 200 ms: kotlin.Unit
615Consumer pauses for 300ms
616Next element is available immediately after large consumer delay: kotlin.Unit
617Next element is ready in 100ms after consumer pause in 300ms: kotlin.Unit
618```
619
620<!--- TEST -->
621
622Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element
623delay if a pause occurs, trying to maintain a fixed rate of produced elements.
624
625Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed
626delay between elements.
627
628<!--- MODULE kotlinx-coroutines-core -->
629<!--- INDEX kotlinx.coroutines -->
630
631[CoroutineScope]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
632[runBlocking]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
633[kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/cancel-children.html
634[Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
635
636<!--- INDEX kotlinx.coroutines.channels -->
637
638[Channel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
639[SendChannel.send]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
640[ReceiveChannel.receive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
641[SendChannel.close]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html
642[produce]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
643[consumeEach]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
644[Channel()]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html
645[ticker]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html
646[ReceiveChannel.cancel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
647[TickerMode.FIXED_DELAY]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y/index.html
648
649<!--- INDEX kotlinx.coroutines.selects -->
650
651[select]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
652
653<!--- END -->
654