1[//]: # (title: Coroutines and channels − tutorial) 2 3In this tutorial, you'll learn how to use coroutines in IntelliJ IDEA to perform network requests without blocking the 4underlying thread or callbacks. 5 6> No prior knowledge of coroutines is required, but you're expected to be familiar with basic Kotlin syntax. 7> 8{type="tip"} 9 10You'll learn: 11 12* Why and how to use suspending functions to perform network requests. 13* How to send requests concurrently using coroutines. 14* How to share information between different coroutines using channels. 15 16For network requests, you'll need the [Retrofit](https://square.github.io/retrofit/) library, but the approach shown in 17this tutorial works similarly for any other libraries that support coroutines. 18 19> You can find solutions for all of the tasks on the `solutions` branch of the [project's repository](http://github.com/kotlin-hands-on/intro-coroutines). 20> 21{type="tip"} 22 23## Before you start 24 251. Download and install the latest version of [IntelliJ IDEA](https://www.jetbrains.com/idea/download/index.html). 262. Clone the [project template](http://github.com/kotlin-hands-on/intro-coroutines) by choosing **Get from VCS** on the 27 Welcome screen or selecting **File | New | Project from Version Control**. 28 29 You can also clone it from the command line: 30 31 ```Bash 32 git clone https://github.com/kotlin-hands-on/intro-coroutines 33 ``` 34 35### Generate a GitHub developer token 36 37You'll be using the GitHub API in your project. To get access, provide your GitHub account name and either a password or a 38token. If you have two-factor authentication enabled, a token will be enough. 39 40Generate a new GitHub token to use the GitHub API with [your account](https://github.com/settings/tokens/new): 41 421. Specify the name of your token, for example, `coroutines-tutorial`: 43 44 {width=700} 45 462. Do not select any scopes. Click **Generate token** at the bottom of the page. 473. Copy the generated token. 48 49### Run the code 50 51The program loads the contributors for all of the repositories under the given organization (named “kotlin” by default). 52Later you'll add logic to sort the users by the number of their contributions. 53 541. Open the `src/contributors/main.kt` file and run the `main()` function. You'll see the following window: 55 56 {width=500} 57 58 If the font is too small, adjust it by changing the value of `setDefaultFontSize(18f)` in the `main()` function. 59 602. Provide your GitHub username and token (or password) in the corresponding fields. 613. Make sure that the _BLOCKING_ option is selected in the _Variant_ dropdown menu. 624. Click _Load contributors_. The UI should freeze for some time and then show the list of contributors. 635. Open the program output to ensure the data has been loaded. The list of contributors is logged after each successful request. 64 65There are different ways of implementing this logic: by using [blocking requests](#blocking-requests) 66or [callbacks](#callbacks). You'll compare these solutions with one that uses [coroutines](#coroutines) and see how 67[channels](#channels) can be used to share information between different coroutines. 68 69## Blocking requests 70 71You will use the [Retrofit](https://square.github.io/retrofit/) library to perform HTTP requests to GitHub. It allows 72requesting the list of repositories under the given organization and the list of contributors for each repository: 73 74```kotlin 75interface GitHubService { 76 @GET("orgs/{org}/repos?per_page=100") 77 fun getOrgReposCall( 78 @Path("org") org: String 79 ): Call<List<Repo>> 80 81 @GET("repos/{owner}/{repo}/contributors?per_page=100") 82 fun getRepoContributorsCall( 83 @Path("owner") owner: String, 84 @Path("repo") repo: String 85 ): Call<List<User>> 86} 87``` 88 89This API is used by the `loadContributorsBlocking()` function to fetch the list of contributors for the given organization. 90 911. Open `src/tasks/Request1Blocking.kt` to see its implementation: 92 93 ```kotlin 94 fun loadContributorsBlocking( 95 service: GitHubService, 96 req: RequestData 97 ): List<User> { 98 val repos = service 99 .getOrgReposCall(req.org) // #1 100 .execute() // #2 101 .also { logRepos(req, it) } // #3 102 .body() ?: emptyList() // #4 103 104 return repos.flatMap { repo -> 105 service 106 .getRepoContributorsCall(req.org, repo.name) // #1 107 .execute() // #2 108 .also { logUsers(repo, it) } // #3 109 .bodyList() // #4 110 }.aggregate() 111 } 112 ``` 113 114 * At first, you get a list of the repositories under the given organization and store it in the `repos` list. Then for 115 each repository, the list of contributors is requested, and all of the lists are merged into one final list of 116 contributors. 117 * `getOrgReposCall()` and `getRepoContributorsCall()` both return an instance of the `*Call` class (`#1`). At this point, 118 no request is sent. 119 * `*Call.execute()` is then invoked to perform the request (`#2`). `execute()` is a synchronous call that blocks the 120 underlying thread. 121 * When you get the response, the result is logged by calling the specific `logRepos()` and `logUsers()` functions (`#3`). 122 If the HTTP response contains an error, this error will be logged here. 123 * Finally, get the response's body, which contains the data you need. For this tutorial, you'll use an empty list as a 124 result in case there is an error, and you'll log the corresponding error (`#4`). 125 1262. To avoid repeating `.body() ?: emptyList()`, an extension function `bodyList()` is declared: 127 128 ```kotlin 129 fun <T> Response<List<T>>.bodyList(): List<T> { 130 return body() ?: emptyList() 131 } 132 ``` 133 1343. Run the program again and take a look at the system output in IntelliJ IDEA. It should have something like this: 135 136 ```text 137 1770 [AWT-EventQueue-0] INFO Contributors - kotlin: loaded 40 repos 138 2025 [AWT-EventQueue-0] INFO Contributors - kotlin-examples: loaded 23 contributors 139 2229 [AWT-EventQueue-0] INFO Contributors - kotlin-koans: loaded 45 contributors 140 ... 141 ``` 142 143 * The first item on each line is the number of milliseconds that have passed since the program started, then the thread 144 name in square brackets. You can see from which thread the loading request is called. 145 * The final item on each line is the actual message: how many repositories or contributors were loaded. 146 147 This log output demonstrates that all of the results were logged from the main thread. When you run the code with a _BLOCKING_ 148 option, the window freezes and doesn't react to input until the loading is finished. All of the requests are executed from 149 the same thread as the one called `loadContributorsBlocking()` is from, which is the main UI thread (in Swing, it's an AWT 150 event dispatching thread). This main thread becomes blocked, and that's why the UI is frozen: 151 152 {width=700} 153 154 After the list of contributors has loaded, the result is updated. 155 1564. In `src/contributors/Contributors.kt`, find the `loadContributors()` function responsible for choosing how 157 the contributors are loaded and look at how `loadContributorsBlocking()` is called: 158 159 ```kotlin 160 when (getSelectedVariant()) { 161 BLOCKING -> { // Blocking UI thread 162 val users = loadContributorsBlocking(service, req) 163 updateResults(users, startTime) 164 } 165 } 166 ``` 167 168 * The `updateResults()` call goes right after the `loadContributorsBlocking()` call. 169 * `updateResults()` updates the UI, so it must always be called from the UI thread. 170 * Since `loadContributorsBlocking()` is also called from the UI thread, the UI thread becomes blocked and the UI is 171 frozen. 172 173### Task 1 174 175The first task helps you familiarize yourself with the task domain. Currently, each contributor's name is repeated 176several times, once for every project they have taken part in. Implement the `aggregate()` function combining the users 177so that each contributor is added only once. The `User.contributions` property should contain the total number of 178contributions of the given user to _all_ the projects. The resulting list should be sorted in descending order according 179to the number of contributions. 180 181Open `src/tasks/Aggregation.kt` and implement the `List<User>.aggregate()` function. Users should be sorted by the total 182number of their contributions. 183 184The corresponding test file `test/tasks/AggregationKtTest.kt` shows an example of the expected result. 185 186> You can jump between the source code and the test class automatically by using the [IntelliJ IDEA shortcut](https://www.jetbrains.com/help/idea/create-tests.html#test-code-navigation) 187> `Ctrl+Shift+T` / `⇧ ⌘ T`. 188> 189{type="tip"} 190 191After implementing this task, the resulting list for the "kotlin" organization should be similar to the following: 192 193{width=500} 194 195#### Solution for task 1 {initial-collapse-state="collapsed"} 196 1971. To group users by login, use [`groupBy()`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/group-by.html), 198 which returns a map from a login to all occurrences of the user with this login in different repositories. 1992. For each map entry, count the total number of contributions for each user and create a new instance of the `User` class 200 by the given name and total of contributions. 2013. Sort the resulting list in descending order: 202 203 ```kotlin 204 fun List<User>.aggregate(): List<User> = 205 groupBy { it.login } 206 .map { (login, group) -> User(login, group.sumOf { it.contributions }) } 207 .sortedByDescending { it.contributions } 208 ``` 209 210An alternative solution is to use the [`groupingBy()`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/grouping-by.html) 211function instead of `groupBy()`. 212 213## Callbacks 214 215The previous solution works, but it blocks the thread and therefore freezes the UI. A traditional approach that avoids this 216is to use _callbacks_. 217 218Instead of calling the code that should be invoked right after the operation is completed, you can extract it 219into a separate callback, often a lambda, and pass that lambda to the caller in order for it to be called later. 220 221To make the UI responsive, you can either move the whole computation to a separate thread or switch to the Retrofit API 222which uses callbacks instead of blocking calls. 223 224### Use a background thread 225 2261. Open `src/tasks/Request2Background.kt` and see its implementation. First, the whole computation is moved to a different 227 thread. The `thread()` function starts a new thread: 228 229 ```kotlin 230 thread { 231 loadContributorsBlocking(service, req) 232 } 233 ``` 234 235 Now that all of the loading has been moved to a separate thread, the main thread is free and can be occupied by other 236 tasks: 237 238 {width=700} 239 2402. The signature of the `loadContributorsBackground()` function changes. It takes an `updateResults()` 241 callback as the last argument to call it after all the loading completes: 242 243 ```kotlin 244 fun loadContributorsBackground( 245 service: GitHubService, req: RequestData, 246 updateResults: (List<User>) -> Unit 247 ) 248 ``` 249 2503. Now when the `loadContributorsBackground()` is called, the `updateResults()` call goes in the callback, not immediately 251 afterward as it did before: 252 253 ```kotlin 254 loadContributorsBackground(service, req) { users -> 255 SwingUtilities.invokeLater { 256 updateResults(users, startTime) 257 } 258 } 259 ``` 260 261 By calling `SwingUtilities.invokeLater`, you ensure that the `updateResults()` call, which updates the results, 262 happens on the main UI thread (AWT event dispatching thread). 263 264However, if you try to load the contributors via the `BACKGROUND` option, you can see that the list is updated but 265nothing changes. 266 267### Task 2 268 269Fix the `loadContributorsBackground()` function in `src/tasks/Request2Background.kt` so that the resulting list is shown 270in the UI. 271 272#### Solution for task 2 {initial-collapse-state="collapsed"} 273 274If you try to load the contributors, you can see in the log that the contributors are loaded but the result isn't displayed. 275To fix this, call `updateResults()` on the resulting list of users: 276 277```kotlin 278thread { 279 updateResults(loadContributorsBlocking(service, req)) 280} 281``` 282 283Make sure to call the logic passed in the callback explicitly. Otherwise, nothing will happen. 284 285### Use the Retrofit callback API 286 287In the previous solution, the whole loading logic is moved to the background thread, but that still isn't the best use of 288resources. All of the loading requests go sequentially and the thread is blocked while waiting for the loading result, 289while it could have been occupied by other tasks. Specifically, the thread could start loading another request to 290receive the entire result earlier. 291 292Handling the data for each repository should then be divided into two parts: loading and processing the 293resulting response. The second _processing_ part should be extracted into a callback. 294 295The loading for each repository can then be started before the result for the previous repository is received (and the 296corresponding callback is called): 297 298{width=700} 299 300The Retrofit callback API can help achieve this. The `Call.enqueue()` function starts an HTTP request and takes a 301callback as an argument. In this callback, you need to specify what needs to be done after each request. 302 303Open `src/tasks/Request3Callbacks.kt` and see the implementation of `loadContributorsCallbacks()` that uses this API: 304 305```kotlin 306fun loadContributorsCallbacks( 307 service: GitHubService, req: RequestData, 308 updateResults: (List<User>) -> Unit 309) { 310 service.getOrgReposCall(req.org).onResponse { responseRepos -> // #1 311 logRepos(req, responseRepos) 312 val repos = responseRepos.bodyList() 313 314 val allUsers = mutableListOf<User>() 315 for (repo in repos) { 316 service.getRepoContributorsCall(req.org, repo.name) 317 .onResponse { responseUsers -> // #2 318 logUsers(repo, responseUsers) 319 val users = responseUsers.bodyList() 320 allUsers += users 321 } 322 } 323 } 324 // TODO: Why doesn't this code work? How to fix that? 325 updateResults(allUsers.aggregate()) 326 } 327``` 328 329* For convenience, this code fragment uses the `onResponse()` extension function declared in the same file. It takes a 330 lambda as an argument rather than an object expression. 331* The logic for handling the responses is extracted into callbacks: the corresponding lambdas start at lines `#1` and `#2`. 332 333However, the provided solution doesn't work. If you run the program and load contributors by choosing the _CALLBACKS_ 334option, you'll see that nothing is shown. However, the test from `Request3CallbacksKtTest` immediately returns the result 335that it successfully passed. 336 337Think about why the given code doesn't work as expected and try to fix it, or see the solutions below. 338 339### Task 3 (optional) 340 341Rewrite the code in the `src/tasks/Request3Callbacks.kt` file so that the loaded list of contributors is shown. 342 343#### The first attempted solution for task 3 {initial-collapse-state="collapsed"} 344 345In the current solution, many requests are started concurrently, which decreases the total loading time. However, 346the result isn't loaded. This is because the `updateResults()` callback is called right after all of the loading requests are started, 347before the `allUsers` list has been filled with the data. 348 349You could try to fix this with a change like the following: 350 351```kotlin 352val allUsers = mutableListOf<User>() 353for ((index, repo) in repos.withIndex()) { // #1 354 service.getRepoContributorsCall(req.org, repo.name) 355 .onResponse { responseUsers -> 356 logUsers(repo, responseUsers) 357 val users = responseUsers.bodyList() 358 allUsers += users 359 if (index == repos.lastIndex) { // #2 360 updateResults(allUsers.aggregate()) 361 } 362 } 363} 364``` 365 366* First, you iterate over the list of repos with an index (`#1`). 367* Then, from each callback, you check whether it's the last iteration (`#2`). 368* And if that's the case, the result is updated. 369 370However, this code also fails to achieve our objective. Try to find the answer yourself, or see the solution below. 371 372#### The second attempted solution for task 3 {initial-collapse-state="collapsed"} 373 374Since the loading requests are started concurrently, there's no guarantee that the result for the last one comes last. The 375results can come in any order. 376 377Thus, if you compare the current index with the `lastIndex` as a condition for completion, you risk losing the results for 378some repos. 379 380If the request that processes the last repo returns faster than some prior requests (which is likely to happen), all of the 381results for requests that take more time will be lost. 382 383One way to fix this is to introduce an index and check whether all of the repositories have already been processed: 384 385```kotlin 386val allUsers = Collections.synchronizedList(mutableListOf<User>()) 387val numberOfProcessed = AtomicInteger() 388for (repo in repos) { 389 service.getRepoContributorsCall(req.org, repo.name) 390 .onResponse { responseUsers -> 391 logUsers(repo, responseUsers) 392 val users = responseUsers.bodyList() 393 allUsers += users 394 if (numberOfProcessed.incrementAndGet() == repos.size) { 395 updateResults(allUsers.aggregate()) 396 } 397 } 398} 399``` 400 401This code uses a synchronized version of the list and `AtomicInteger()` because, in general, there's no guarantee that 402different callbacks that process `getRepoContributors()` requests will always be called from the same thread. 403 404#### The third attempted solution for task 3 {initial-collapse-state="collapsed"} 405 406An even better solution is to use the `CountDownLatch` class. It stores a counter initialized with the number of 407repositories. This counter is decremented after processing each repository. It then waits until the latch is counted 408down to zero before updating the results: 409 410```kotlin 411val countDownLatch = CountDownLatch(repos.size) 412for (repo in repos) { 413 service.getRepoContributorsCall(req.org, repo.name) 414 .onResponse { responseUsers -> 415 // processing repository 416 countDownLatch.countDown() 417 } 418} 419countDownLatch.await() 420updateResults(allUsers.aggregate()) 421``` 422 423The result is then updated from the main thread. This is more direct than delegating the logic to the child threads. 424 425After reviewing these three attempts at a solution, you can see that writing correct code with callbacks is non-trivial 426and error-prone, especially when several underlying threads and synchronization occur. 427 428> As an additional exercise, you can implement the same logic using a reactive approach with the RxJava library. All of the 429> necessary dependencies and solutions for using RxJava can be found in a separate `rx` branch. It is also possible to 430> complete this tutorial and implement or check the proposed Rx versions for a proper comparison. 431> 432{type="tip"} 433 434## Suspending functions 435 436You can implement the same logic using suspending functions. Instead of returning `Call<List<Repo>>`, define the API 437call as a [suspending function](composing-suspending-functions.md) as follows: 438 439```kotlin 440interface GitHubService { 441 @GET("orgs/{org}/repos?per_page=100") 442 suspend fun getOrgRepos( 443 @Path("org") org: String 444 ): List<Repo> 445} 446``` 447 448* `getOrgRepos()` is defined as a `suspend` function. When you use a suspending function to perform a request, the 449 underlying thread isn't blocked. More details about how this works will come in later sections. 450* `getOrgRepos()` returns the result directly instead of returning a `Call`. If the result is unsuccessful, an 451 exception is thrown. 452 453Alternatively, Retrofit allows returning the result wrapped in `Response`. In this case, the result body is 454provided, and it is possible to check for errors manually. This tutorial uses the versions that return `Response`. 455 456In `src/contributors/GitHubService.kt`, add the following declarations to the `GitHubService` interface: 457 458```kotlin 459interface GitHubService { 460 // getOrgReposCall & getRepoContributorsCall declarations 461 462 @GET("orgs/{org}/repos?per_page=100") 463 suspend fun getOrgRepos( 464 @Path("org") org: String 465 ): Response<List<Repo>> 466 467 @GET("repos/{owner}/{repo}/contributors?per_page=100") 468 suspend fun getRepoContributors( 469 @Path("owner") owner: String, 470 @Path("repo") repo: String 471 ): Response<List<User>> 472} 473``` 474 475### Task 4 476 477Your task is to change the code of the function that loads contributors to make use of two new suspending functions, 478`getOrgRepos()` and `getRepoContributors()`. The new `loadContributorsSuspend()` function is marked as `suspend` to use the 479new API. 480 481> Suspending functions can't be called everywhere. Calling a suspending function from `loadContributorsBlocking()` will 482> result in an error with the message "Suspend function 'getOrgRepos' should be called only from a coroutine or another 483> suspend function". 484> 485{type="note"} 486 4871. Copy the implementation of `loadContributorsBlocking()` that is defined in `src/tasks/Request1Blocking.kt` 488 into the `loadContributorsSuspend()` that is defined in `src/tasks/Request4Suspend.kt`. 4892. Modify the code so that the new suspending functions are used instead of the ones that return `Call`s. 4903. Run the program by choosing the _SUSPEND_ option and ensure that the UI is still responsive while the GitHub requests 491 are performed. 492 493#### Solution for task 4 {initial-collapse-state="collapsed"} 494 495Replace `.getOrgReposCall(req.org).execute()` with `.getOrgRepos(req.org)` and repeat the same replacement for the 496second "contributors" request: 497 498```kotlin 499suspend fun loadContributorsSuspend(service: GitHubService, req: RequestData): List<User> { 500 val repos = service 501 .getOrgRepos(req.org) 502 .also { logRepos(req, it) } 503 .bodyList() 504 505 return repos.flatMap { repo -> 506 service.getRepoContributors(req.org, repo.name) 507 .also { logUsers(repo, it) } 508 .bodyList() 509 }.aggregate() 510} 511``` 512 513* `loadContributorsSuspend()` should be defined as a `suspend` function. 514* You no longer need to call `execute`, which returned the `Response` before, because now the API functions return 515 the `Response` directly. Note that this detail is specific to the Retrofit library. With other libraries, the API will be different, 516 but the concept is the same. 517 518## Coroutines 519 520The code with suspending functions looks similar to the "blocking" version. The major difference from the blocking version 521is that instead of blocking the thread, the coroutine is suspended: 522 523```text 524block -> suspend 525thread -> coroutine 526``` 527 528> Coroutines are often called lightweight threads because you can run code on coroutines, similar to how you run code on 529> threads. The operations that were blocking before (and had to be avoided) can now suspend the coroutine instead. 530> 531{type="note"} 532 533### Starting a new coroutine 534 535If you look at how `loadContributorsSuspend()` is used in `src/contributors/Contributors.kt`, you can see that it's 536called inside `launch`. `launch` is a library function that takes a lambda as an argument: 537 538```kotlin 539launch { 540 val users = loadContributorsSuspend(req) 541 updateResults(users, startTime) 542} 543``` 544 545Here `launch` starts a new computation that is responsible for loading the data and showing the results. The computation 546is suspendable – when performing network requests, it is suspended and releases the underlying thread. 547When the network request returns the result, the computation is resumed. 548 549Such a suspendable computation is called a _coroutine_. So, in this case, `launch` _starts a new coroutine_ responsible 550for loading data and showing the results. 551 552Coroutines run on top of threads and can be suspended. When a coroutine is suspended, the 553corresponding computation is paused, removed from the thread, and stored in memory. Meanwhile, the thread is free to be 554occupied by other tasks: 555 556{width=700} 557 558When the computation is ready to be continued, it is returned to a thread (not necessarily the same one). 559 560In the `loadContributorsSuspend()` example, each "contributors" request now waits for the result using the suspension 561mechanism. First, the new request is sent. Then, while waiting for the response, the whole "load contributors" coroutine 562that was started by the `launch` function is suspended. 563 564The coroutine resumes only after the corresponding response is received: 565 566{width=700} 567 568While the response is waiting to be received, the thread is free to be occupied by other tasks. The UI stays responsive, 569despite all the requests taking place on the main UI thread: 570 5711. Run the program using the _SUSPEND_ option. The log confirms that all of the requests are sent to the main UI thread: 572 573 ```text 574 2538 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos 575 2729 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - ts2kt: loaded 11 contributors 576 3029 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-koans: loaded 45 contributors 577 ... 578 11252 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-coroutines-workshop: loaded 1 contributors 579 ``` 580 5812. The log can show you which coroutine the corresponding code is running on. To enable it, open **Run | Edit configurations** 582 and add the `-Dkotlinx.coroutines.debug` VM option: 583 584 {width=500} 585 586 The coroutine name will be attached to the thread name while `main()` is run with this option. You can also 587 modify the template for running all of the Kotlin files and enable this option by default. 588 589Now all of the code runs on one coroutine, the "load contributors" coroutine mentioned above, denoted as `@coroutine#1`. 590While waiting for the result, you shouldn't reuse the thread for sending other requests because the code is 591written sequentially. The new request is sent only when the previous result is received. 592 593Suspending functions treat the thread fairly and don't block it for "waiting". However, this doesn't yet bring any concurrency 594into the picture. 595 596## Concurrency 597 598Kotlin coroutines are much less resource-intensive than threads. 599Each time you want to start a new computation asynchronously, you can create a new coroutine instead. 600 601To start a new coroutine, use one of the main _coroutine builders_: `launch`, `async`, or `runBlocking`. Different 602libraries can define additional coroutine builders. 603 604`async` starts a new coroutine and returns a `Deferred` object. `Deferred` represents a concept known by other names 605such as `Future` or `Promise`. It stores a computation, but it _defers_ the moment you get the final result; 606it _promises_ the result sometime in the _future_. 607 608The main difference between `async` and `launch` is that `launch` is used to start a computation that isn't expected to 609return a specific result. `launch` returns a `Job` that represents the coroutine. It is possible to wait until it completes 610by calling `Job.join()`. 611 612`Deferred` is a generic type that extends `Job`. An `async` call can return a `Deferred<Int>` or a `Deferred<CustomType>`, 613depending on what the lambda returns (the last expression inside the lambda is the result). 614 615To get the result of a coroutine, you can call `await()` on the `Deferred` instance. While waiting for the result, 616the coroutine that this `await()` is called from is suspended: 617 618```kotlin 619import kotlinx.coroutines.* 620 621fun main() = runBlocking { 622 val deferred: Deferred<Int> = async { 623 loadData() 624 } 625 println("waiting...") 626 println(deferred.await()) 627} 628 629suspend fun loadData(): Int { 630 println("loading...") 631 delay(1000L) 632 println("loaded!") 633 return 42 634} 635``` 636 637`runBlocking` is used as a bridge between regular and suspending functions, or between the blocking and non-blocking worlds. It works 638as an adaptor for starting the top-level main coroutine. It is intended primarily to be used in `main()` functions and 639tests. 640 641> Watch [this video](https://www.youtube.com/watch?v=zEZc5AmHQhk) for a better understanding of coroutines. 642> 643{type="tip"} 644 645If there is a list of deferred objects, you can call `awaitAll()` to await the results of all of them: 646 647```kotlin 648import kotlinx.coroutines.* 649 650fun main() = runBlocking { 651 val deferreds: List<Deferred<Int>> = (1..3).map { 652 async { 653 delay(1000L * it) 654 println("Loading $it") 655 it 656 } 657 } 658 val sum = deferreds.awaitAll().sum() 659 println("$sum") 660} 661``` 662 663When each "contributors" request is started in a new coroutine, all of the requests are started asynchronously. A new request 664can be sent before the result for the previous one is received: 665 666{width=700} 667 668The total loading time is approximately the same as in the _CALLBACKS_ version, but it doesn't need any callbacks. 669What's more, `async` explicitly emphasizes which parts run concurrently in the code. 670 671### Task 5 672 673In the `Request5Concurrent.kt` file, implement a `loadContributorsConcurrent()` function by using the 674previous `loadContributorsSuspend()` function. 675 676#### Tip for task 5 {initial-collapse-state="collapsed"} 677 678You can only start a new coroutine inside a coroutine scope. Copy the content 679from `loadContributorsSuspend()` to the `coroutineScope` call so that you can call `async` functions there: 680 681```kotlin 682suspend fun loadContributorsConcurrent( 683 service: GitHubService, 684 req: RequestData 685): List<User> = coroutineScope { 686 // ... 687} 688``` 689 690Base your solution on the following scheme: 691 692```kotlin 693val deferreds: List<Deferred<List<User>>> = repos.map { repo -> 694 async { 695 // load contributors for each repo 696 } 697} 698deferreds.awaitAll() // List<List<User>> 699``` 700 701#### Solution for task 5 {initial-collapse-state="collapsed"} 702 703Wrap each "contributors" request with `async` to create as many coroutines as there are repositories. `async` 704returns `Deferred<List<User>>`. This is not an issue because creating new coroutines is not very resource-intensive, so you can 705create as many as you need. 706 7071. You can no longer use `flatMap` because the `map` result is now a list of `Deferred` objects, not a list of lists. 708 `awaitAll()` returns `List<List<User>>`, so call `flatten().aggregate()` to get the result: 709 710 ```kotlin 711 suspend fun loadContributorsConcurrent( 712 service: GitHubService, 713 req: RequestData 714 ): List<User> = coroutineScope { 715 val repos = service 716 .getOrgRepos(req.org) 717 .also { logRepos(req, it) } 718 .bodyList() 719 720 val deferreds: List<Deferred<List<User>>> = repos.map { repo -> 721 async { 722 service.getRepoContributors(req.org, repo.name) 723 .also { logUsers(repo, it) } 724 .bodyList() 725 } 726 } 727 deferreds.awaitAll().flatten().aggregate() 728 } 729 ``` 730 7312. Run the code and check the log. All of the coroutines still run on the main UI thread because 732 multithreading hasn't been employed yet, but you can already see the benefits of running coroutines concurrently. 7333. To change this code to run "contributors" coroutines on different threads from the common thread pool, 734 specify `Dispatchers.Default` as the context argument for the `async` function: 735 736 ```kotlin 737 async(Dispatchers.Default) { } 738 ``` 739 740 * `CoroutineDispatcher` determines what thread or threads the corresponding coroutine should be run on. If you don't 741 specify one as an argument, `async` will use the dispatcher from the outer scope. 742 * `Dispatchers.Default` represents a shared pool of threads on the JVM. This pool provides a means for parallel execution. 743 It consists of as many threads as there are CPU cores available, but it will still have two threads if there's only one core. 744 7454. Modify the code in the `loadContributorsConcurrent()` function to start new coroutines on different threads from the 746 common thread pool. Also, add additional logging before sending the request: 747 748 ```kotlin 749 async(Dispatchers.Default) { 750 log("starting loading for ${repo.name}") 751 service.getRepoContributors(req.org, repo.name) 752 .also { logUsers(repo, it) } 753 .bodyList() 754 } 755 ``` 756 7575. Run the program once again. In the log, you can see that each coroutine can be started on one thread from the 758 thread pool and resumed on another: 759 760 ```text 761 1946 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans 762 1946 [DefaultDispatcher-worker-3 @coroutine#5] INFO Contributors - starting loading for dokka 763 1946 [DefaultDispatcher-worker-1 @coroutine#3] INFO Contributors - starting loading for ts2kt 764 ... 765 2178 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors 766 2569 [DefaultDispatcher-worker-1 @coroutine#5] INFO Contributors - dokka: loaded 36 contributors 767 2821 [DefaultDispatcher-worker-2 @coroutine#3] INFO Contributors - ts2kt: loaded 11 contributors 768 ``` 769 770 For instance, in this log excerpt, `coroutine#4` is started on the `worker-2` thread and continued on the 771 `worker-1` thread. 772 773In `src/contributors/Contributors.kt`, check the implementation of the _CONCURRENT_ option: 774 7751. To run the coroutine only on the main UI thread, specify `Dispatchers.Main` as an argument: 776 777 ```kotlin 778 launch(Dispatchers.Main) { 779 updateResults() 780 } 781 ``` 782 783 * If the main thread is busy when you start a new coroutine on it, 784 the coroutine becomes suspended and scheduled for execution on this thread. The coroutine will only resume when the 785 thread becomes free. 786 * It's considered good practice to use the dispatcher from the outer scope rather than explicitly specifying it on each 787 end-point. If you define `loadContributorsConcurrent()` without passing `Dispatchers.Default` as an 788 argument, you can call this function in any context: with a `Default` dispatcher, with 789 the main UI thread, or with a custom dispatcher. 790 * As you'll see later, when calling `loadContributorsConcurrent()` from tests, you can call it in the context 791 with `TestDispatcher`, which simplifies testing. That makes this solution much more flexible. 792 7932. To specify the dispatcher on the caller side, apply the following change to the project while 794 letting `loadContributorsConcurrent` start coroutines in the inherited context: 795 796 ```kotlin 797 launch(Dispatchers.Default) { 798 val users = loadContributorsConcurrent(service, req) 799 withContext(Dispatchers.Main) { 800 updateResults(users, startTime) 801 } 802 } 803 ``` 804 805 * `updateResults()` should be called on the main UI thread, so you call it with the context of `Dispatchers.Main`. 806 * `withContext()` calls the given code with the specified coroutine context, is suspended until it completes, and returns 807 the result. An alternative but more verbose way to express this would be to start a new coroutine and explicitly 808 wait (by suspending) until it completes: `launch(context) { ... }.join()`. 809 8103. Run the code and ensure that the coroutines are executed on the threads from the thread pool. 811 812## Structured concurrency 813 814* The _coroutine scope_ is responsible for the structure and parent-child relationships between different coroutines. New 815 coroutines usually need to be started inside a scope. 816* The _coroutine context_ stores additional technical information used to run a given coroutine, like the coroutine custom 817 name, or the dispatcher specifying the threads the coroutine should be scheduled on. 818 819When `launch`, `async`, or `runBlocking` are used to start a new coroutine, they automatically create the corresponding 820scope. All of these functions take a lambda with a receiver as an argument, and `CoroutineScope` is the implicit receiver type: 821 822```kotlin 823launch { /* this: CoroutineScope */ } 824``` 825 826* New coroutines can only be started inside a scope. 827* `launch` and `async` are declared as extensions to `CoroutineScope`, so an implicit or explicit receiver must always 828 be passed when you call them. 829* The coroutine started by `runBlocking` is the only exception because `runBlocking` is defined as a top-level function. 830 But because it blocks the current thread, it's intended primarily to be used in `main()` functions and tests as a bridge 831 function. 832 833A new coroutine inside `runBlocking`, `launch`, or `async` is started automatically inside the scope: 834 835```kotlin 836import kotlinx.coroutines.* 837 838fun main() = runBlocking { /* this: CoroutineScope */ 839 launch { /* ... */ } 840 // the same as: 841 this.launch { /* ... */ } 842} 843``` 844 845When you call `launch` inside `runBlocking`, it's called as an extension to the implicit receiver of 846the `CoroutineScope` type. Alternatively, you could explicitly write `this.launch`. 847 848The nested coroutine (started by `launch` in this example) can be considered as a child of the outer coroutine (started 849by `runBlocking`). This "parent-child" relationship works through scopes; the child coroutine is started from the scope 850corresponding to the parent coroutine. 851 852It's possible to create a new scope without starting a new coroutine, by using the `coroutineScope` function. 853To start new coroutines in a structured way inside a `suspend` function without access to the outer scope, you can create 854a new coroutine scope that automatically becomes a child of the outer scope that this `suspend` function is called from. 855`loadContributorsConcurrent()`is a good example. 856 857You can also start a new coroutine from the global scope using `GlobalScope.async` or `GlobalScope.launch`. 858This will create a top-level "independent" coroutine. 859 860The mechanism behind the structure of the coroutines is called _structured concurrency_. It provides the following 861benefits over global scopes: 862 863* The scope is generally responsible for child coroutines, whose lifetime is attached to the lifetime of the scope. 864* The scope can automatically cancel child coroutines if something goes wrong or a user changes their mind and decides 865 to revoke the operation. 866* The scope automatically waits for the completion of all child coroutines. 867 Therefore, if the scope corresponds to a coroutine, the parent coroutine does not complete until all the coroutines 868 launched in its scope have completed. 869 870When using `GlobalScope.async`, there is no structure that binds several coroutines to a smaller scope. 871Coroutines started from the global scope are all independent – their lifetime is limited only by the lifetime of the 872whole application. It's possible to store a reference to the coroutine started from the global scope and wait for its 873completion or cancel it explicitly, but that won't happen automatically as it would with structured concurrency. 874 875### Canceling the loading of contributors 876 877Create two versions of the function that loads the list of contributors. Compare how both versions behave when you try to 878cancel the parent coroutine. The first version will use `coroutineScope` to start all of the child coroutines, 879whereas the second will use `GlobalScope`. 880 8811. In `Request5Concurrent.kt`, add a 3-second delay to the `loadContributorsConcurrent()` function: 882 883 ```kotlin 884 suspend fun loadContributorsConcurrent( 885 service: GitHubService, 886 req: RequestData 887 ): List<User> = coroutineScope { 888 // ... 889 async { 890 log("starting loading for ${repo.name}") 891 delay(3000) 892 // load repo contributors 893 } 894 // ... 895 } 896 ``` 897 898 The delay affects all of the coroutines that send requests, so that there's enough time to cancel the loading 899 after the coroutines are started but before the requests are sent. 900 9012. Create the second version of the loading function: copy the implementation of `loadContributorsConcurrent()` to 902 `loadContributorsNotCancellable()` in `Request5NotCancellable.kt` and then remove the creation of a new `coroutineScope`. 9033. The `async` calls now fail to resolve, so start them by using `GlobalScope.async`: 904 905 ```kotlin 906 suspend fun loadContributorsNotCancellable( 907 service: GitHubService, 908 req: RequestData 909 ): List<User> { // #1 910 // ... 911 GlobalScope.async { // #2 912 log("starting loading for ${repo.name}") 913 // load repo contributors 914 } 915 // ... 916 return deferreds.awaitAll().flatten().aggregate() // #3 917 } 918 ``` 919 920 * The function now returns the result directly, not as the last expression inside the lambda (lines `#1` and `#3`). 921 * All of the "contributors" coroutines are started inside the `GlobalScope`, not as children of the coroutine scope 922 (line `#2`). 923 9244. Run the program and choose the _CONCURRENT_ option to load the contributors. 9255. Wait until all of the "contributors" coroutines are started, and then click _Cancel_. The log shows no new results, 926 which means that all of the requests were indeed canceled: 927 928 ```text 929 2896 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 40 repos 930 2901 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans 931 ... 932 2909 [DefaultDispatcher-worker-5 @coroutine#36] INFO Contributors - starting loading for mpp-example 933 /* click on 'cancel' */ 934 /* no requests are sent */ 935 ``` 936 9376. Repeat step 5, but this time choose the `NOT_CANCELLABLE` option: 938 939 ```text 940 2570 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos 941 2579 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - starting loading for kotlin-koans 942 ... 943 2586 [DefaultDispatcher-worker-6 @coroutine#36] INFO Contributors - starting loading for mpp-example 944 /* click on 'cancel' */ 945 /* but all the requests are still sent: */ 946 6402 [DefaultDispatcher-worker-5 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors 947 ... 948 9555 [DefaultDispatcher-worker-8 @coroutine#36] INFO Contributors - mpp-example: loaded 8 contributors 949 ``` 950 951 In this case, no coroutines are canceled, and all the requests are still sent. 952 9537. Check how the cancellation is triggered in the "contributors" program. When the _Cancel_ button is clicked, 954 the main "loading" coroutine is explicitly canceled and the child coroutines are canceled automatically: 955 956 ```kotlin 957 interface Contributors { 958 959 fun loadContributors() { 960 // ... 961 when (getSelectedVariant()) { 962 CONCURRENT -> { 963 launch { 964 val users = loadContributorsConcurrent(service, req) 965 updateResults(users, startTime) 966 }.setUpCancellation() // #1 967 } 968 } 969 } 970 971 private fun Job.setUpCancellation() { 972 val loadingJob = this // #2 973 974 // cancel the loading job if the 'cancel' button was clicked: 975 val listener = ActionListener { 976 loadingJob.cancel() // #3 977 updateLoadingStatus(CANCELED) 978 } 979 // add a listener to the 'cancel' button: 980 addCancelListener(listener) 981 982 // update the status and remove the listener 983 // after the loading job is completed 984 } 985 } 986 ``` 987 988The `launch` function returns an instance of `Job`. `Job` stores a reference to the "loading coroutine", which loads 989all of the data and updates the results. You can call the `setUpCancellation()` extension function on it (line `#1`), 990passing an instance of `Job` as a receiver. 991 992Another way you could express this would be to explicitly write: 993 994```kotlin 995val job = launch { } 996job.setUpCancellation() 997``` 998 999* For readability, you could refer to the `setUpCancellation()` function receiver inside the function with the 1000 new `loadingJob` variable (line `#2`). 1001* Then you could add a listener to the _Cancel_ button so that when it's clicked, the `loadingJob` is canceled (line `#3`). 1002 1003With structured concurrency, you only need to cancel the parent coroutine and this automatically propagates cancellation 1004to all of the child coroutines. 1005 1006### Using the outer scope's context 1007 1008When you start new coroutines inside the given scope, it's much easier to ensure that all of them run with the same 1009context. It is also much easier to replace the context if needed. 1010 1011Now it's time to learn how using the dispatcher from the outer scope works. The new scope created by 1012the `coroutineScope` or by the coroutine builders always inherits the context from the outer scope. In this case, the 1013outer scope is the scope the `suspend loadContributorsConcurrent()` function was called from: 1014 1015```kotlin 1016launch(Dispatchers.Default) { // outer scope 1017 val users = loadContributorsConcurrent(service, req) 1018 // ... 1019} 1020``` 1021 1022All of the nested coroutines are automatically started with the inherited context. The dispatcher is a part of this 1023context. That's why all of the coroutines started by `async` are started with the context of the default dispatcher: 1024 1025```kotlin 1026suspend fun loadContributorsConcurrent( 1027 service: GitHubService, req: RequestData 1028): List<User> = coroutineScope { 1029 // this scope inherits the context from the outer scope 1030 // ... 1031 async { // nested coroutine started with the inherited context 1032 // ... 1033 } 1034 // ... 1035} 1036``` 1037 1038With structured concurrency, you can specify the major context elements (like dispatcher) once, when creating the 1039top-level coroutine. All the nested coroutines then inherit the context and modify it only if needed. 1040 1041> When you write code with coroutines for UI applications, for example Android ones, it's a common practice to 1042> use `CoroutineDispatchers.Main` by default for the top coroutine and then to explicitly put a different dispatcher when 1043> you need to run the code on a different thread. 1044> 1045{type="tip"} 1046 1047## Showing progress 1048 1049Despite the information for some repositories being loaded rather quickly, the user only sees the resulting list after all of 1050the data has been loaded. Until then, the loader icon runs showing the progress, but there's no information about the current 1051state or what contributors are already loaded. 1052 1053You can show the intermediate results earlier and display all of the contributors after loading the data for each of the 1054repositories: 1055 1056{width=500} 1057 1058To implement this functionality, in the `src/tasks/Request6Progress.kt`, you'll need to pass the logic updating the UI 1059as a callback, so that it's called on each intermediate state: 1060 1061```kotlin 1062suspend fun loadContributorsProgress( 1063 service: GitHubService, 1064 req: RequestData, 1065 updateResults: suspend (List<User>, completed: Boolean) -> Unit 1066) { 1067 // loading the data 1068 // calling `updateResults()` on intermediate states 1069} 1070``` 1071 1072On the call site in `Contributors.kt`, the callback is passed to update the results from the `Main` thread for 1073the _PROGRESS_ option: 1074 1075```kotlin 1076launch(Dispatchers.Default) { 1077 loadContributorsProgress(service, req) { users, completed -> 1078 withContext(Dispatchers.Main) { 1079 updateResults(users, startTime, completed) 1080 } 1081 } 1082} 1083``` 1084 1085* The `updateResults()` parameter is declared as `suspend` in `loadContributorsProgress()`. It's necessary to call 1086 `withContext`, which is a `suspend` function inside the corresponding lambda argument. 1087* `updateResults()` callback takes an additional Boolean parameter as an argument specifying whether the loading has 1088 completed and the results are final. 1089 1090### Task 6 1091 1092In the `Request6Progress.kt` file, implement the `loadContributorsProgress()` function that shows the intermediate 1093progress. Base it on the `loadContributorsSuspend()` function from `Request4Suspend.kt`. 1094 1095* Use a simple version without concurrency; you'll add it later in the next section. 1096* The intermediate list of contributors should be shown in an "aggregated" state, not just the list of users loaded for 1097 each repository. 1098* The total number of contributions for each user should be increased when the data for each new 1099 repository is loaded. 1100 1101#### Solution for task 6 {initial-collapse-state="collapsed"} 1102 1103To store the intermediate list of loaded contributors in the "aggregated" state, define an `allUsers` variable which 1104stores the list of users, and then update it after contributors for each new repository are loaded: 1105 1106```kotlin 1107suspend fun loadContributorsProgress( 1108 service: GitHubService, 1109 req: RequestData, 1110 updateResults: suspend (List<User>, completed: Boolean) -> Unit 1111) { 1112 val repos = service 1113 .getOrgRepos(req.org) 1114 .also { logRepos(req, it) } 1115 .bodyList() 1116 1117 var allUsers = emptyList<User>() 1118 for ((index, repo) in repos.withIndex()) { 1119 val users = service.getRepoContributors(req.org, repo.name) 1120 .also { logUsers(repo, it) } 1121 .bodyList() 1122 1123 allUsers = (allUsers + users).aggregate() 1124 updateResults(allUsers, index == repos.lastIndex) 1125 } 1126} 1127``` 1128 1129#### Consecutive vs concurrent 1130 1131An `updateResults()` callback is called after each request is completed: 1132 1133{width=700} 1134 1135This code doesn't include concurrency. It's sequential, so you don't need synchronization. 1136 1137The best option would be to send requests concurrently and update the intermediate results after getting the response 1138for each repository: 1139 1140{width=700} 1141 1142To add concurrency, use _channels_. 1143 1144## Channels 1145 1146Writing code with a shared mutable state is quite difficult and error-prone (like in the solution using callbacks). 1147A simpler way is to share information by communication rather than by using a common mutable state. 1148Coroutines can communicate with each other through _channels_. 1149 1150Channels are communication primitives that allow data to be passed between coroutines. One coroutine can _send_ 1151some information to a channel, while another can _receive_ that information from it: 1152 1153 1154 1155A coroutine that sends (produces) information is often called a producer, and a coroutine that receives (consumes) 1156information is called a consumer. One or multiple coroutines can send information to the same channel, and one or multiple 1157coroutines can receive data from it: 1158 1159 1160 1161When many coroutines receive information from the same channel, each element is handled only once by one of the 1162consumers. Once an element is handled, it is immediately removed from the channel. 1163 1164You can think of a channel as similar to a collection of elements, or more precisely, a queue, in which elements are added 1165to one end and received from the other. However, there's an important difference: unlike collections, even in their 1166synchronized versions, a channel can _suspend_ `send()`and `receive()` operations. This happens when the channel is empty 1167or full. The channel can be full if the channel size has an upper bound. 1168 1169`Channel` is represented by three different interfaces: `SendChannel`, `ReceiveChannel`, and `Channel`, with the latter 1170extending the first two. You usually create a channel and give it to producers as a `SendChannel` instance so that only 1171they can send information to the channel. 1172You give a channel to consumers as a `ReceiveChannel` instance so that only they can receive from it. Both `send` 1173and `receive` methods are declared as `suspend`: 1174 1175```kotlin 1176interface SendChannel<in E> { 1177 suspend fun send(element: E) 1178 fun close(): Boolean 1179} 1180 1181interface ReceiveChannel<out E> { 1182 suspend fun receive(): E 1183} 1184 1185interface Channel<E> : SendChannel<E>, ReceiveChannel<E> 1186``` 1187 1188The producer can close a channel to indicate that no more elements are coming. 1189 1190Several types of channels are defined in the library. They differ in how many elements they can internally store and 1191whether the `send()` call can be suspended or not. 1192For all of the channel types, the `receive()` call behaves similarly: it receives an element if the channel is not empty; 1193otherwise, it is suspended. 1194 1195<deflist collapsible="true"> 1196 <def title="Unlimited channel"> 1197 <p>An unlimited channel is the closest analog to a queue: producers can send elements to this channel and it will 1198keep growing indefinitely. The <code>send()</code> call will never be suspended. 1199If the program runs out of memory, you'll get an <code>OutOfMemoryException</code>. 1200The difference between an unlimited channel and a queue is that when a consumer tries to receive from an empty channel, 1201it becomes suspended until some new elements are sent.</p> 1202 <img src="unlimited-channel.png" alt="Unlimited channel" width="500"/> 1203 </def> 1204 <def title="Buffered channel"> 1205 <p>The size of a buffered channel is constrained by the specified number. 1206Producers can send elements to this channel until the size limit is reached. All of the elements are internally stored. 1207When the channel is full, the next `send` call on it is suspended until more free space becomes available.</p> 1208 <img src="buffered-channel.png" alt="Buffered channel" width="500"/> 1209 </def> 1210 <def title="Rendezvous channel"> 1211 <p>The "Rendezvous" channel is a channel without a buffer, the same as a buffered channel with zero size. 1212One of the functions (<code>send()</code> or <code>receive()</code>) is always suspended until the other is called. </p> 1213 <p>If the <code>send()</code> function is called and there's no suspended <code>receive()</code> call ready to process the element, then <code>send()</code> 1214is suspended. Similarly, if the <code>receive()</code> function is called and the channel is empty or, in other words, there's no 1215suspended <code>send()</code> call ready to send the element, the <code>receive()</code> call is suspended. </p> 1216 <p>The "rendezvous" name ("a meeting at an agreed time and place") refers to the fact that <code>send()</code> and <code>receive()</code> 1217should "meet on time".</p> 1218 <img src="rendezvous-channel.png" alt="Rendezvous channel" width="500"/> 1219 </def> 1220 <def title="Conflated channel"> 1221 <p>A new element sent to the conflated channel will overwrite the previously sent element, so the receiver will always 1222get only the latest element. The <code>send()</code> call is never suspended.</p> 1223 <img src="conflated-channel.gif" alt="Conflated channel" width="500"/> 1224 </def> 1225</deflist> 1226 1227When you create a channel, specify its type or the buffer size (if you need a buffered one): 1228 1229```kotlin 1230val rendezvousChannel = Channel<String>() 1231val bufferedChannel = Channel<String>(10) 1232val conflatedChannel = Channel<String>(CONFLATED) 1233val unlimitedChannel = Channel<String>(UNLIMITED) 1234``` 1235 1236By default, a "Rendezvous" channel is created. 1237 1238In the following task, you'll create a "Rendezvous" channel, two producer coroutines, and a consumer coroutine: 1239 1240```kotlin 1241import kotlinx.coroutines.channels.Channel 1242import kotlinx.coroutines.* 1243 1244fun main() = runBlocking<Unit> { 1245 val channel = Channel<String>() 1246 launch { 1247 channel.send("A1") 1248 channel.send("A2") 1249 log("A done") 1250 } 1251 launch { 1252 channel.send("B1") 1253 log("B done") 1254 } 1255 launch { 1256 repeat(3) { 1257 val x = channel.receive() 1258 log(x) 1259 } 1260 } 1261} 1262 1263fun log(message: Any?) { 1264 println("[${Thread.currentThread().name}] $message") 1265} 1266``` 1267 1268> Watch [this video](https://www.youtube.com/watch?v=HpWQUoVURWQ) for a better understanding of channels. 1269> 1270{type="tip"} 1271 1272### Task 7 1273 1274In `src/tasks/Request7Channels.kt`, implement the function `loadContributorsChannels()` that requests all of the GitHub 1275contributors concurrently and shows intermediate progress at the same time. 1276 1277Use the previous functions, `loadContributorsConcurrent()` from `Request5Concurrent.kt` 1278and `loadContributorsProgress()` from `Request6Progress.kt`. 1279 1280#### Tip for task 7 {initial-collapse-state="collapsed"} 1281 1282Different coroutines that concurrently receive contributor lists for different repositories can send all of the received 1283results to the same channel: 1284 1285```kotlin 1286val channel = Channel<List<User>>() 1287for (repo in repos) { 1288 launch { 1289 val users = TODO() 1290 // ... 1291 channel.send(users) 1292 } 1293} 1294``` 1295 1296Then the elements from this channel can be received one by one and processed: 1297 1298```kotlin 1299repeat(repos.size) { 1300 val users = channel.receive() 1301 // ... 1302} 1303``` 1304 1305Since the `receive()` calls are sequential, no additional synchronization is needed. 1306 1307#### Solution for task 7 {initial-collapse-state="collapsed"} 1308 1309As with the `loadContributorsProgress()` function, you can create an `allUsers` variable to store the intermediate 1310states of the "all contributors" list. 1311Each new list received from the channel is added to the list of all users. You aggregate the result and update the state 1312using the `updateResults` callback: 1313 1314```kotlin 1315suspend fun loadContributorsChannels( 1316 service: GitHubService, 1317 req: RequestData, 1318 updateResults: suspend (List<User>, completed: Boolean) -> Unit 1319) = coroutineScope { 1320 1321 val repos = service 1322 .getOrgRepos(req.org) 1323 .also { logRepos(req, it) } 1324 .bodyList() 1325 1326 val channel = Channel<List<User>>() 1327 for (repo in repos) { 1328 launch { 1329 val users = service.getRepoContributors(req.org, repo.name) 1330 .also { logUsers(repo, it) } 1331 .bodyList() 1332 channel.send(users) 1333 } 1334 } 1335 var allUsers = emptyList<User>() 1336 repeat(repos.size) { 1337 val users = channel.receive() 1338 allUsers = (allUsers + users).aggregate() 1339 updateResults(allUsers, it == repos.lastIndex) 1340 } 1341} 1342``` 1343 1344* Results for different repositories are added to the channel as soon as they are ready. At first, when all of the requests 1345 are sent, and no data is received, the `receive()` call is suspended. In this case, the whole "load contributors" coroutine 1346 is suspended. 1347* Then, when the list of users is sent to the channel, the "load contributors" coroutine resumes, the `receive()` call 1348 returns this list, and the results are immediately updated. 1349 1350You can now run the program and choose the _CHANNELS_ option to load the contributors and see the result. 1351 1352Although neither coroutines nor channels completely remove the complexity that comes with concurrency, 1353they make life easier when you need to understand what's going on. 1354 1355## Testing coroutines 1356 1357Let's now test all solutions to check that the solution with concurrent coroutines is faster than the solution with 1358the `suspend` functions, and check that the solution with channels is faster than the simple "progress" one. 1359 1360In the following task, you'll compare the total running time of the solutions. You'll mock a GitHub service and make 1361this service return results after the given timeouts: 1362 1363```text 1364repos request - returns an answer within 1000 ms delay 1365repo-1 - 1000 ms delay 1366repo-2 - 1200 ms delay 1367repo-3 - 800 ms delay 1368``` 1369 1370The sequential solution with the `suspend` functions should take around 4000 ms (4000 = 1000 + (1000 + 1200 + 800)). 1371The concurrent solution should take around 2200 ms (2200 = 1000 + max(1000, 1200, 800)). 1372 1373For the solutions that show progress, you can also check the intermediate results with timestamps. 1374 1375The corresponding test data is defined in `test/contributors/testData.kt`, and the files `Request4SuspendKtTest`, 1376`Request7ChannelsKtTest`, and so on contain the straightforward tests that use mock service calls. 1377 1378However, there are two problems here: 1379 1380* These tests take too long to run. Each test takes around 2 to 4 seconds, and you need to wait for the results each 1381 time. It's not very efficient. 1382* You can't rely on the exact time the solution runs because it still takes additional time to prepare and run the code. 1383 You could add a constant, but then the time would differ from machine to machine. The mock service delays 1384 should be higher than this constant so you can see a difference. If the constant is 0.5 sec, making the delays 1385 0.1 sec won't be enough. 1386 1387A better way would be to use special frameworks to test the timing while running the same code several times (which increases 1388the total time even more), but that is complicated to learn and set up. 1389 1390To solve these problems and make sure that solutions with provided test delays behave as expected, one faster than the other, 1391use _virtual_ time with a special test dispatcher. This dispatcher keeps track of the virtual time passed from 1392the start and runs everything immediately in real time. When you run coroutines on this dispatcher, 1393the `delay` will return immediately and advance the virtual time. 1394 1395Tests that use this mechanism run fast, but you can still check what happens at different moments in virtual time. The 1396total running time drastically decreases: 1397 1398{width=700} 1399 1400To use virtual time, replace the `runBlocking` invocation with a `runTest`. `runTest` takes an 1401extension lambda to `TestScope` as an argument. 1402When you call `delay` in a `suspend` function inside this special scope, `delay` will increase the virtual time instead 1403of delaying in real time: 1404 1405```kotlin 1406@Test 1407fun testDelayInSuspend() = runTest { 1408 val realStartTime = System.currentTimeMillis() 1409 val virtualStartTime = currentTime 1410 1411 foo() 1412 println("${System.currentTimeMillis() - realStartTime} ms") // ~ 6 ms 1413 println("${currentTime - virtualStartTime} ms") // 1000 ms 1414} 1415 1416suspend fun foo() { 1417 delay(1000) // auto-advances without delay 1418 println("foo") // executes eagerly when foo() is called 1419} 1420``` 1421 1422You can check the current virtual time using the `currentTime` property of `TestScope`. 1423 1424The actual running time in this example is several milliseconds, whereas virtual time equals the delay argument, which 1425is 1000 milliseconds. 1426 1427To get the full effect of "virtual" `delay` in child coroutines, 1428start all of the child coroutines with `TestDispatcher`. Otherwise, it won't work. This dispatcher is 1429automatically inherited from the other `TestScope`, unless you provide a different dispatcher: 1430 1431```kotlin 1432@Test 1433fun testDelayInLaunch() = runTest { 1434 val realStartTime = System.currentTimeMillis() 1435 val virtualStartTime = currentTime 1436 1437 bar() 1438 1439 println("${System.currentTimeMillis() - realStartTime} ms") // ~ 11 ms 1440 println("${currentTime - virtualStartTime} ms") // 1000 ms 1441} 1442 1443suspend fun bar() = coroutineScope { 1444 launch { 1445 delay(1000) // auto-advances without delay 1446 println("bar") // executes eagerly when bar() is called 1447 } 1448} 1449``` 1450 1451If `launch` is called with the context of `Dispatchers.Default` in the example above, the test will fail. You'll get an 1452exception saying that the job has not been completed yet. 1453 1454You can test the `loadContributorsConcurrent()` function this way only if it starts the child coroutines with the 1455inherited context, without modifying it using the `Dispatchers.Default` dispatcher. 1456 1457You can specify the context elements like the dispatcher when _calling_ a function rather than when _defining_ it, 1458which allows for more flexibility and easier testing. 1459 1460> The testing API that supports virtual time is [Experimental](components-stability.md) and may change in the future. 1461> 1462{type="warning"} 1463 1464By default, the compiler shows warnings if you use the experimental testing API. To suppress these warnings, annotate 1465the test function or the whole class containing the tests with `@OptIn(ExperimentalCoroutinesApi::class)`. 1466Add the compiler argument instructing the compiler that you're using the experimental API: 1467 1468```kotlin 1469compileTestKotlin { 1470 kotlinOptions { 1471 freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental" 1472 } 1473} 1474``` 1475 1476In the project corresponding to this tutorial, the compiler argument has already been added to the Gradle script. 1477 1478### Task 8 1479 1480Refactor the following tests in `tests/tasks/` to use virtual time instead of real time: 1481 1482* Request4SuspendKtTest.kt 1483* Request5ConcurrentKtTest.kt 1484* Request6ProgressKtTest.kt 1485* Request7ChannelsKtTest.kt 1486 1487Compare the total running times before and after applying your refactoring. 1488 1489#### Tip for task 8 {initial-collapse-state="collapsed"} 1490 14911. Replace the `runBlocking` invocation with `runTest`, and replace `System.currentTimeMillis()` with `currentTime`: 1492 1493 ```kotlin 1494 @Test 1495 fun test() = runTest { 1496 val startTime = currentTime 1497 // action 1498 val totalTime = currentTime - startTime 1499 // testing result 1500 } 1501 ``` 1502 15032. Uncomment the assertions that check the exact virtual time. 15043. Don't forget to add `@UseExperimental(ExperimentalCoroutinesApi::class)`. 1505 1506#### Solution for task 8 {initial-collapse-state="collapsed"} 1507 1508Here are the solutions for the concurrent and channels cases: 1509 1510```kotlin 1511fun testConcurrent() = runTest { 1512 val startTime = currentTime 1513 val result = loadContributorsConcurrent(MockGithubService, testRequestData) 1514 Assert.assertEquals("Wrong result for 'loadContributorsConcurrent'", expectedConcurrentResults.users, result) 1515 val totalTime = currentTime - startTime 1516 1517 Assert.assertEquals( 1518 "The calls run concurrently, so the total virtual time should be 2200 ms: " + 1519 "1000 for repos request plus max(1000, 1200, 800) = 1200 for concurrent contributors requests)", 1520 expectedConcurrentResults.timeFromStart, totalTime 1521 ) 1522} 1523``` 1524 1525First, check that the results are available exactly at the expected virtual time, and then check the results 1526themselves: 1527 1528```kotlin 1529fun testChannels() = runTest { 1530 val startTime = currentTime 1531 var index = 0 1532 loadContributorsChannels(MockGithubService, testRequestData) { users, _ -> 1533 val expected = concurrentProgressResults[index++] 1534 val time = currentTime - startTime 1535 Assert.assertEquals( 1536 "Expected intermediate results after ${expected.timeFromStart} ms:", 1537 expected.timeFromStart, time 1538 ) 1539 Assert.assertEquals("Wrong intermediate results after $time:", expected.users, users) 1540 } 1541} 1542``` 1543 1544The first intermediate result for the last version with channels becomes available sooner than the progress version, and you 1545can see the difference in tests that use virtual time. 1546 1547> The tests for the remaining "suspend" and "progress" tasks are very similar – you can find them in the project's 1548> `solutions` branch. 1549> 1550{type="tip"} 1551 1552## What's next 1553 1554* Check out the [Asynchronous Programming with Kotlin](https://kotlinconf.com/workshops/) workshop at KotlinConf. 1555* Find out more about using [virtual time and the experimental testing package](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-test/). 1556