1<!--- TEST_NAME SelectGuideTest --> 2 3[//]: # (title: Select expression \(experimental\)) 4 5Select expression makes it possible to await multiple suspending functions simultaneously and _select_ 6the first one that becomes available. 7 8> Select expressions are an experimental feature of `kotlinx.coroutines`. Their API is expected to 9> evolve in the upcoming updates of the `kotlinx.coroutines` library with potentially 10> breaking changes. 11> 12{type="note"} 13 14## Selecting from channels 15 16Let us have two producers of strings: `fizz` and `buzz`. The `fizz` produces "Fizz" string every 500 ms: 17 18```kotlin 19fun CoroutineScope.fizz() = produce<String> { 20 while (true) { // sends "Fizz" every 500 ms 21 delay(500) 22 send("Fizz") 23 } 24} 25``` 26 27And the `buzz` produces "Buzz!" string every 1000 ms: 28 29```kotlin 30fun CoroutineScope.buzz() = produce<String> { 31 while (true) { // sends "Buzz!" every 1000 ms 32 delay(1000) 33 send("Buzz!") 34 } 35} 36``` 37 38Using [receive][ReceiveChannel.receive] suspending function we can receive _either_ from one channel or the 39other. But [select] expression allows us to receive from _both_ simultaneously using its 40[onReceive][ReceiveChannel.onReceive] clauses: 41 42```kotlin 43suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { 44 select<Unit> { // <Unit> means that this select expression does not produce any result 45 fizz.onReceive { value -> // this is the first select clause 46 println("fizz -> '$value'") 47 } 48 buzz.onReceive { value -> // this is the second select clause 49 println("buzz -> '$value'") 50 } 51 } 52} 53``` 54 55Let us run it all seven times: 56 57<!--- CLEAR --> 58 59```kotlin 60import kotlinx.coroutines.* 61import kotlinx.coroutines.channels.* 62import kotlinx.coroutines.selects.* 63 64fun CoroutineScope.fizz() = produce<String> { 65 while (true) { // sends "Fizz" every 500 ms 66 delay(500) 67 send("Fizz") 68 } 69} 70 71fun CoroutineScope.buzz() = produce<String> { 72 while (true) { // sends "Buzz!" every 1000 ms 73 delay(1000) 74 send("Buzz!") 75 } 76} 77 78suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { 79 select<Unit> { // <Unit> means that this select expression does not produce any result 80 fizz.onReceive { value -> // this is the first select clause 81 println("fizz -> '$value'") 82 } 83 buzz.onReceive { value -> // this is the second select clause 84 println("buzz -> '$value'") 85 } 86 } 87} 88 89fun main() = runBlocking<Unit> { 90//sampleStart 91 val fizz = fizz() 92 val buzz = buzz() 93 repeat(7) { 94 selectFizzBuzz(fizz, buzz) 95 } 96 coroutineContext.cancelChildren() // cancel fizz & buzz coroutines 97//sampleEnd 98} 99``` 100{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 101 102> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-select-01.kt). 103> 104{type="note"} 105 106The result of this code is: 107 108```text 109fizz -> 'Fizz' 110buzz -> 'Buzz!' 111fizz -> 'Fizz' 112fizz -> 'Fizz' 113buzz -> 'Buzz!' 114fizz -> 'Fizz' 115fizz -> 'Fizz' 116``` 117 118<!--- TEST --> 119 120## Selecting on close 121 122The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed causing the corresponding 123`select` to throw an exception. We can use [onReceiveCatching][ReceiveChannel.onReceiveCatching] clause to perform a 124specific action when the channel is closed. The following example also shows that `select` is an expression that returns 125the result of its selected clause: 126 127```kotlin 128suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = 129 select<String> { 130 a.onReceiveCatching { it -> 131 val value = it.getOrNull() 132 if (value != null) { 133 "a -> '$value'" 134 } else { 135 "Channel 'a' is closed" 136 } 137 } 138 b.onReceiveCatching { it -> 139 val value = it.getOrNull() 140 if (value != null) { 141 "b -> '$value'" 142 } else { 143 "Channel 'b' is closed" 144 } 145 } 146 } 147``` 148 149 150Let's use it with channel `a` that produces "Hello" string four times and 151channel `b` that produces "World" four times: 152 153<!--- CLEAR --> 154 155```kotlin 156import kotlinx.coroutines.* 157import kotlinx.coroutines.channels.* 158import kotlinx.coroutines.selects.* 159 160suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = 161 select<String> { 162 a.onReceiveCatching { it -> 163 val value = it.getOrNull() 164 if (value != null) { 165 "a -> '$value'" 166 } else { 167 "Channel 'a' is closed" 168 } 169 } 170 b.onReceiveCatching { it -> 171 val value = it.getOrNull() 172 if (value != null) { 173 "b -> '$value'" 174 } else { 175 "Channel 'b' is closed" 176 } 177 } 178 } 179 180fun main() = runBlocking<Unit> { 181//sampleStart 182 val a = produce<String> { 183 repeat(4) { send("Hello $it") } 184 } 185 val b = produce<String> { 186 repeat(4) { send("World $it") } 187 } 188 repeat(8) { // print first eight results 189 println(selectAorB(a, b)) 190 } 191 coroutineContext.cancelChildren() 192//sampleEnd 193} 194``` 195{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 196 197> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt). 198> 199{type="note"} 200 201The result of this code is quite interesting, so we'll analyze it in more detail: 202 203```text 204a -> 'Hello 0' 205a -> 'Hello 1' 206b -> 'World 0' 207a -> 'Hello 2' 208a -> 'Hello 3' 209b -> 'World 1' 210Channel 'a' is closed 211Channel 'a' is closed 212``` 213 214<!--- TEST --> 215 216There are a couple of observations to make out of it. 217 218First of all, `select` is _biased_ to the first clause. When several clauses are selectable at the same time, 219the first one among them gets selected. Here, both channels are constantly producing strings, so `a` channel, 220being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from 221time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too. 222 223The second observation, is that [onReceiveCatching][ReceiveChannel.onReceiveCatching] gets immediately selected when the 224channel is already closed. 225 226## Selecting to send 227 228Select expression has [onSend][SendChannel.onSend] clause that can be used for a great good in combination 229with a biased nature of selection. 230 231Let us write an example of a producer of integers that sends its values to a `side` channel when 232the consumers on its primary channel cannot keep up with it: 233 234```kotlin 235fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> { 236 for (num in 1..10) { // produce 10 numbers from 1 to 10 237 delay(100) // every 100 ms 238 select<Unit> { 239 onSend(num) {} // Send to the primary channel 240 side.onSend(num) {} // or to the side channel 241 } 242 } 243} 244``` 245 246Consumer is going to be quite slow, taking 250 ms to process each number: 247 248<!--- CLEAR --> 249 250```kotlin 251import kotlinx.coroutines.* 252import kotlinx.coroutines.channels.* 253import kotlinx.coroutines.selects.* 254 255fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> { 256 for (num in 1..10) { // produce 10 numbers from 1 to 10 257 delay(100) // every 100 ms 258 select<Unit> { 259 onSend(num) {} // Send to the primary channel 260 side.onSend(num) {} // or to the side channel 261 } 262 } 263} 264 265fun main() = runBlocking<Unit> { 266//sampleStart 267 val side = Channel<Int>() // allocate side channel 268 launch { // this is a very fast consumer for the side channel 269 side.consumeEach { println("Side channel has $it") } 270 } 271 produceNumbers(side).consumeEach { 272 println("Consuming $it") 273 delay(250) // let us digest the consumed number properly, do not hurry 274 } 275 println("Done consuming") 276 coroutineContext.cancelChildren() 277//sampleEnd 278} 279``` 280{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 281 282> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-select-03.kt). 283> 284{type="note"} 285 286So let us see what happens: 287 288```text 289Consuming 1 290Side channel has 2 291Side channel has 3 292Consuming 4 293Side channel has 5 294Side channel has 6 295Consuming 7 296Side channel has 8 297Side channel has 9 298Consuming 10 299Done consuming 300``` 301 302<!--- TEST --> 303 304## Selecting deferred values 305 306Deferred values can be selected using [onAwait][Deferred.onAwait] clause. 307Let us start with an async function that returns a deferred string value after 308a random delay: 309 310```kotlin 311fun CoroutineScope.asyncString(time: Int) = async { 312 delay(time.toLong()) 313 "Waited for $time ms" 314} 315``` 316 317Let us start a dozen of them with a random delay. 318 319```kotlin 320fun CoroutineScope.asyncStringsList(): List<Deferred<String>> { 321 val random = Random(3) 322 return List(12) { asyncString(random.nextInt(1000)) } 323} 324``` 325 326Now the main function awaits for the first of them to complete and counts the number of deferred values 327that are still active. Note that we've used here the fact that `select` expression is a Kotlin DSL, 328so we can provide clauses for it using an arbitrary code. In this case we iterate over a list 329of deferred values to provide `onAwait` clause for each deferred value. 330 331<!--- CLEAR --> 332 333```kotlin 334import kotlinx.coroutines.* 335import kotlinx.coroutines.selects.* 336import java.util.* 337 338fun CoroutineScope.asyncString(time: Int) = async { 339 delay(time.toLong()) 340 "Waited for $time ms" 341} 342 343fun CoroutineScope.asyncStringsList(): List<Deferred<String>> { 344 val random = Random(3) 345 return List(12) { asyncString(random.nextInt(1000)) } 346} 347 348fun main() = runBlocking<Unit> { 349//sampleStart 350 val list = asyncStringsList() 351 val result = select<String> { 352 list.withIndex().forEach { (index, deferred) -> 353 deferred.onAwait { answer -> 354 "Deferred $index produced answer '$answer'" 355 } 356 } 357 } 358 println(result) 359 val countActive = list.count { it.isActive } 360 println("$countActive coroutines are still active") 361//sampleEnd 362} 363``` 364{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 365 366> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-select-04.kt). 367> 368{type="note"} 369 370The output is: 371 372```text 373Deferred 4 produced answer 'Waited for 128 ms' 37411 coroutines are still active 375``` 376 377<!--- TEST --> 378 379## Switch over a channel of deferred values 380 381Let us write a channel producer function that consumes a channel of deferred string values, waits for each received 382deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together 383[onReceiveCatching][ReceiveChannel.onReceiveCatching] and [onAwait][Deferred.onAwait] clauses in the same `select`: 384 385```kotlin 386fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> { 387 var current = input.receive() // start with first received deferred value 388 while (isActive) { // loop while not cancelled/closed 389 val next = select<Deferred<String>?> { // return next deferred value from this select or null 390 input.onReceiveCatching { update -> 391 update.getOrNull() 392 } 393 current.onAwait { value -> 394 send(value) // send value that current deferred has produced 395 input.receiveCatching().getOrNull() // and use the next deferred from the input channel 396 } 397 } 398 if (next == null) { 399 println("Channel was closed") 400 break // out of loop 401 } else { 402 current = next 403 } 404 } 405} 406``` 407 408To test it, we'll use a simple async function that resolves to a specified string after a specified time: 409 410```kotlin 411fun CoroutineScope.asyncString(str: String, time: Long) = async { 412 delay(time) 413 str 414} 415``` 416 417The main function just launches a coroutine to print results of `switchMapDeferreds` and sends some test 418data to it: 419 420<!--- CLEAR --> 421 422```kotlin 423import kotlinx.coroutines.* 424import kotlinx.coroutines.channels.* 425import kotlinx.coroutines.selects.* 426 427fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> { 428 var current = input.receive() // start with first received deferred value 429 while (isActive) { // loop while not cancelled/closed 430 val next = select<Deferred<String>?> { // return next deferred value from this select or null 431 input.onReceiveCatching { update -> 432 update.getOrNull() 433 } 434 current.onAwait { value -> 435 send(value) // send value that current deferred has produced 436 input.receiveCatching().getOrNull() // and use the next deferred from the input channel 437 } 438 } 439 if (next == null) { 440 println("Channel was closed") 441 break // out of loop 442 } else { 443 current = next 444 } 445 } 446} 447 448fun CoroutineScope.asyncString(str: String, time: Long) = async { 449 delay(time) 450 str 451} 452 453fun main() = runBlocking<Unit> { 454//sampleStart 455 val chan = Channel<Deferred<String>>() // the channel for test 456 launch { // launch printing coroutine 457 for (s in switchMapDeferreds(chan)) 458 println(s) // print each received string 459 } 460 chan.send(asyncString("BEGIN", 100)) 461 delay(200) // enough time for "BEGIN" to be produced 462 chan.send(asyncString("Slow", 500)) 463 delay(100) // not enough time to produce slow 464 chan.send(asyncString("Replace", 100)) 465 delay(500) // give it time before the last one 466 chan.send(asyncString("END", 500)) 467 delay(1000) // give it time to process 468 chan.close() // close the channel ... 469 delay(500) // and wait some time to let it finish 470//sampleEnd 471} 472``` 473{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 474 475> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt). 476> 477{type="note"} 478 479The result of this code: 480 481```text 482BEGIN 483Replace 484END 485Channel was closed 486``` 487 488<!--- TEST --> 489 490<!--- MODULE kotlinx-coroutines-core --> 491<!--- INDEX kotlinx.coroutines --> 492 493[Deferred.onAwait]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-deferred/on-await.html 494 495<!--- INDEX kotlinx.coroutines.channels --> 496 497[ReceiveChannel.receive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html 498[ReceiveChannel.onReceive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html 499[ReceiveChannel.onReceiveCatching]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-catching.html 500[SendChannel.send]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html 501[SendChannel.onSend]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html 502 503<!--- INDEX kotlinx.coroutines.selects --> 504 505[select]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html 506 507<!--- END --> 508