xref: /aosp_15_r20/external/kotlinx.coroutines/docs/topics/select-expression.md (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)
1<!--- TEST_NAME SelectGuideTest -->
2
3[//]: # (title: Select expression \(experimental\))
4
5Select expression makes it possible to await multiple suspending functions simultaneously and _select_
6the first one that becomes available.
7
8> Select expressions are an experimental feature of `kotlinx.coroutines`. Their API is expected to
9> evolve in the upcoming updates of the `kotlinx.coroutines` library with potentially
10> breaking changes.
11>
12{type="note"}
13
14## Selecting from channels
15
16Let us have two producers of strings: `fizz` and `buzz`. The `fizz` produces "Fizz" string every 500 ms:
17
18```kotlin
19fun CoroutineScope.fizz() = produce<String> {
20    while (true) { // sends "Fizz" every 500 ms
21        delay(500)
22        send("Fizz")
23    }
24}
25```
26
27And the `buzz` produces "Buzz!" string every 1000 ms:
28
29```kotlin
30fun CoroutineScope.buzz() = produce<String> {
31    while (true) { // sends "Buzz!" every 1000 ms
32        delay(1000)
33        send("Buzz!")
34    }
35}
36```
37
38Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the
39other. But [select] expression allows us to receive from _both_ simultaneously using its
40[onReceive][ReceiveChannel.onReceive] clauses:
41
42```kotlin
43suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
44    select<Unit> { // <Unit> means that this select expression does not produce any result
45        fizz.onReceive { value ->  // this is the first select clause
46            println("fizz -> '$value'")
47        }
48        buzz.onReceive { value ->  // this is the second select clause
49            println("buzz -> '$value'")
50        }
51    }
52}
53```
54
55Let us run it all seven times:
56
57<!--- CLEAR -->
58
59```kotlin
60import kotlinx.coroutines.*
61import kotlinx.coroutines.channels.*
62import kotlinx.coroutines.selects.*
63
64fun CoroutineScope.fizz() = produce<String> {
65    while (true) { // sends "Fizz" every 500 ms
66        delay(500)
67        send("Fizz")
68    }
69}
70
71fun CoroutineScope.buzz() = produce<String> {
72    while (true) { // sends "Buzz!" every 1000 ms
73        delay(1000)
74        send("Buzz!")
75    }
76}
77
78suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
79    select<Unit> { // <Unit> means that this select expression does not produce any result
80        fizz.onReceive { value ->  // this is the first select clause
81            println("fizz -> '$value'")
82        }
83        buzz.onReceive { value ->  // this is the second select clause
84            println("buzz -> '$value'")
85        }
86    }
87}
88
89fun main() = runBlocking<Unit> {
90//sampleStart
91    val fizz = fizz()
92    val buzz = buzz()
93    repeat(7) {
94        selectFizzBuzz(fizz, buzz)
95    }
96    coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
97//sampleEnd
98}
99```
100{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
101
102> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-select-01.kt).
103>
104{type="note"}
105
106The result of this code is:
107
108```text
109fizz -> 'Fizz'
110buzz -> 'Buzz!'
111fizz -> 'Fizz'
112fizz -> 'Fizz'
113buzz -> 'Buzz!'
114fizz -> 'Fizz'
115fizz -> 'Fizz'
116```
117
118<!--- TEST -->
119
120## Selecting on close
121
122The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed causing the corresponding
123`select` to throw an exception. We can use [onReceiveCatching][ReceiveChannel.onReceiveCatching] clause to perform a
124specific action when the channel is closed. The following example also shows that `select` is an expression that returns
125the result of its selected clause:
126
127```kotlin
128suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
129    select<String> {
130        a.onReceiveCatching { it ->
131            val value = it.getOrNull()
132            if (value != null) {
133                "a -> '$value'"
134            } else {
135                "Channel 'a' is closed"
136            }
137        }
138        b.onReceiveCatching { it ->
139            val value = it.getOrNull()
140            if (value != null) {
141                "b -> '$value'"
142            } else {
143                "Channel 'b' is closed"
144            }
145        }
146    }
147```
148
149
150Let's use it with channel `a` that produces "Hello" string four times and
151channel `b` that produces "World" four times:
152
153<!--- CLEAR -->
154
155```kotlin
156import kotlinx.coroutines.*
157import kotlinx.coroutines.channels.*
158import kotlinx.coroutines.selects.*
159
160suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
161    select<String> {
162        a.onReceiveCatching { it ->
163            val value = it.getOrNull()
164            if (value != null) {
165                "a -> '$value'"
166            } else {
167                "Channel 'a' is closed"
168            }
169        }
170        b.onReceiveCatching { it ->
171            val value = it.getOrNull()
172            if (value != null) {
173                "b -> '$value'"
174            } else {
175                "Channel 'b' is closed"
176            }
177        }
178    }
179
180fun main() = runBlocking<Unit> {
181//sampleStart
182    val a = produce<String> {
183        repeat(4) { send("Hello $it") }
184    }
185    val b = produce<String> {
186        repeat(4) { send("World $it") }
187    }
188    repeat(8) { // print first eight results
189        println(selectAorB(a, b))
190    }
191    coroutineContext.cancelChildren()
192//sampleEnd
193}
194```
195{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
196
197> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt).
198>
199{type="note"}
200
201The result of this code is quite interesting, so we'll analyze it in more detail:
202
203```text
204a -> 'Hello 0'
205a -> 'Hello 1'
206b -> 'World 0'
207a -> 'Hello 2'
208a -> 'Hello 3'
209b -> 'World 1'
210Channel 'a' is closed
211Channel 'a' is closed
212```
213
214<!--- TEST -->
215
216There are a couple of observations to make out of it.
217
218First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time,
219the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel,
220being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
221time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
222
223The second observation, is that [onReceiveCatching][ReceiveChannel.onReceiveCatching] gets immediately selected when the
224channel is already closed.
225
226## Selecting to send
227
228Select expression has [onSend][SendChannel.onSend] clause that can be used for a great good in combination
229with a biased nature of selection.
230
231Let us write an example of a producer of integers that sends its values to a `side` channel when
232the consumers on its primary channel cannot keep up with it:
233
234```kotlin
235fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
236    for (num in 1..10) { // produce 10 numbers from 1 to 10
237        delay(100) // every 100 ms
238        select<Unit> {
239            onSend(num) {} // Send to the primary channel
240            side.onSend(num) {} // or to the side channel
241        }
242    }
243}
244```
245
246Consumer is going to be quite slow, taking 250 ms to process each number:
247
248<!--- CLEAR -->
249
250```kotlin
251import kotlinx.coroutines.*
252import kotlinx.coroutines.channels.*
253import kotlinx.coroutines.selects.*
254
255fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
256    for (num in 1..10) { // produce 10 numbers from 1 to 10
257        delay(100) // every 100 ms
258        select<Unit> {
259            onSend(num) {} // Send to the primary channel
260            side.onSend(num) {} // or to the side channel
261        }
262    }
263}
264
265fun main() = runBlocking<Unit> {
266//sampleStart
267    val side = Channel<Int>() // allocate side channel
268    launch { // this is a very fast consumer for the side channel
269        side.consumeEach { println("Side channel has $it") }
270    }
271    produceNumbers(side).consumeEach {
272        println("Consuming $it")
273        delay(250) // let us digest the consumed number properly, do not hurry
274    }
275    println("Done consuming")
276    coroutineContext.cancelChildren()
277//sampleEnd
278}
279```
280{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
281
282> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-select-03.kt).
283>
284{type="note"}
285
286So let us see what happens:
287
288```text
289Consuming 1
290Side channel has 2
291Side channel has 3
292Consuming 4
293Side channel has 5
294Side channel has 6
295Consuming 7
296Side channel has 8
297Side channel has 9
298Consuming 10
299Done consuming
300```
301
302<!--- TEST -->
303
304## Selecting deferred values
305
306Deferred values can be selected using [onAwait][Deferred.onAwait] clause.
307Let us start with an async function that returns a deferred string value after
308a random delay:
309
310```kotlin
311fun CoroutineScope.asyncString(time: Int) = async {
312    delay(time.toLong())
313    "Waited for $time ms"
314}
315```
316
317Let us start a dozen of them with a random delay.
318
319```kotlin
320fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
321    val random = Random(3)
322    return List(12) { asyncString(random.nextInt(1000)) }
323}
324```
325
326Now the main function awaits for the first of them to complete and counts the number of deferred values
327that are still active. Note that we've used here the fact that `select` expression is a Kotlin DSL,
328so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
329of deferred values to provide `onAwait` clause for each deferred value.
330
331<!--- CLEAR -->
332
333```kotlin
334import kotlinx.coroutines.*
335import kotlinx.coroutines.selects.*
336import java.util.*
337
338fun CoroutineScope.asyncString(time: Int) = async {
339    delay(time.toLong())
340    "Waited for $time ms"
341}
342
343fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
344    val random = Random(3)
345    return List(12) { asyncString(random.nextInt(1000)) }
346}
347
348fun main() = runBlocking<Unit> {
349//sampleStart
350    val list = asyncStringsList()
351    val result = select<String> {
352        list.withIndex().forEach { (index, deferred) ->
353            deferred.onAwait { answer ->
354                "Deferred $index produced answer '$answer'"
355            }
356        }
357    }
358    println(result)
359    val countActive = list.count { it.isActive }
360    println("$countActive coroutines are still active")
361//sampleEnd
362}
363```
364{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
365
366> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-select-04.kt).
367>
368{type="note"}
369
370The output is:
371
372```text
373Deferred 4 produced answer 'Waited for 128 ms'
37411 coroutines are still active
375```
376
377<!--- TEST -->
378
379## Switch over a channel of deferred values
380
381Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
382deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
383[onReceiveCatching][ReceiveChannel.onReceiveCatching] and [onAwait][Deferred.onAwait] clauses in the same `select`:
384
385```kotlin
386fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
387    var current = input.receive() // start with first received deferred value
388    while (isActive) { // loop while not cancelled/closed
389        val next = select<Deferred<String>?> { // return next deferred value from this select or null
390            input.onReceiveCatching { update ->
391                update.getOrNull()
392            }
393            current.onAwait { value ->
394                send(value) // send value that current deferred has produced
395                input.receiveCatching().getOrNull() // and use the next deferred from the input channel
396            }
397        }
398        if (next == null) {
399            println("Channel was closed")
400            break // out of loop
401        } else {
402            current = next
403        }
404    }
405}
406```
407
408To test it, we'll use a simple async function that resolves to a specified string after a specified time:
409
410```kotlin
411fun CoroutineScope.asyncString(str: String, time: Long) = async {
412    delay(time)
413    str
414}
415```
416
417The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test
418data to it:
419
420<!--- CLEAR -->
421
422```kotlin
423import kotlinx.coroutines.*
424import kotlinx.coroutines.channels.*
425import kotlinx.coroutines.selects.*
426
427fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
428    var current = input.receive() // start with first received deferred value
429    while (isActive) { // loop while not cancelled/closed
430        val next = select<Deferred<String>?> { // return next deferred value from this select or null
431            input.onReceiveCatching { update ->
432                update.getOrNull()
433            }
434            current.onAwait { value ->
435                send(value) // send value that current deferred has produced
436                input.receiveCatching().getOrNull() // and use the next deferred from the input channel
437            }
438        }
439        if (next == null) {
440            println("Channel was closed")
441            break // out of loop
442        } else {
443            current = next
444        }
445    }
446}
447
448fun CoroutineScope.asyncString(str: String, time: Long) = async {
449    delay(time)
450    str
451}
452
453fun main() = runBlocking<Unit> {
454//sampleStart
455    val chan = Channel<Deferred<String>>() // the channel for test
456    launch { // launch printing coroutine
457        for (s in switchMapDeferreds(chan))
458            println(s) // print each received string
459    }
460    chan.send(asyncString("BEGIN", 100))
461    delay(200) // enough time for "BEGIN" to be produced
462    chan.send(asyncString("Slow", 500))
463    delay(100) // not enough time to produce slow
464    chan.send(asyncString("Replace", 100))
465    delay(500) // give it time before the last one
466    chan.send(asyncString("END", 500))
467    delay(1000) // give it time to process
468    chan.close() // close the channel ...
469    delay(500) // and wait some time to let it finish
470//sampleEnd
471}
472```
473{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
474
475> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt).
476>
477{type="note"}
478
479The result of this code:
480
481```text
482BEGIN
483Replace
484END
485Channel was closed
486```
487
488<!--- TEST -->
489
490<!--- MODULE kotlinx-coroutines-core -->
491<!--- INDEX kotlinx.coroutines -->
492
493[Deferred.onAwait]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-deferred/on-await.html
494
495<!--- INDEX kotlinx.coroutines.channels -->
496
497[ReceiveChannel.receive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
498[ReceiveChannel.onReceive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
499[ReceiveChannel.onReceiveCatching]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-catching.html
500[SendChannel.send]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
501[SendChannel.onSend]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html
502
503<!--- INDEX kotlinx.coroutines.selects -->
504
505[select]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
506
507<!--- END -->
508