<lambda>null1 package kotlinx.coroutines.selects
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.internal.*
7 import kotlinx.coroutines.selects.TrySelectDetailedResult.*
8 import kotlin.contracts.*
9 import kotlin.coroutines.*
10 import kotlin.internal.*
11 import kotlin.jvm.*
12
13 /**
14 * Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
15 * in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
16 * is either _selected_ or _fails_.
17 *
18 * At most one clause is *atomically* selected and its block is executed. The result of the selected clause
19 * becomes the result of the select. If any clause _fails_, then the select invocation produces the
20 * corresponding exception. No clause is selected in this case.
21 *
22 * This select function is _biased_ to the first clause. When multiple clauses can be selected at the same time,
23 * the first one of them gets priority. Use [selectUnbiased] for an unbiased (randomized) selection among
24 * the clauses.
25
26 * There is no `default` clause for select expression. Instead, each selectable suspending function has the
27 * corresponding non-suspending version that can be used with a regular `when` expression to select one
28 * of the alternatives or to perform the default (`else`) action if none of them can be immediately selected.
29 *
30 * ### List of supported select methods
31 *
32 * | **Receiver** | **Suspending function** | **Select clause**
33 * | ---------------- | --------------------------------------------- | -----------------------------------------------------
34 * | [Job] | [join][Job.join] | [onJoin][Job.onJoin]
35 * | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait]
36 * | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend]
37 * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive]
38 * | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching]
39 * | none | [delay] | [onTimeout][SelectBuilder.onTimeout]
40 *
41 * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
42 * suspending function is waiting, this function immediately resumes with [CancellationException].
43 * There is a **prompt cancellation guarantee**: even if this function is ready to return the result, but was cancelled
44 * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
45 *
46 * Note that this function does not check for cancellation when it is not suspended.
47 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
48 */
49 @OptIn(ExperimentalContracts::class)
50 public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R {
51 contract {
52 callsInPlace(builder, InvocationKind.EXACTLY_ONCE)
53 }
54 return SelectImplementation<R>(coroutineContext).run {
55 builder(this)
56 // TAIL-CALL OPTIMIZATION: the only
57 // suspend call is at the last position.
58 doSelect()
59 }
60 }
61
62 /**
63 * Scope for [select] invocation.
64 *
65 * An instance of [SelectBuilder] can only be retrieved as a receiver of a [select] block call,
66 * and it is only valid during the registration phase of the select builder.
67 * Any uses outside it lead to unspecified behaviour and are prohibited.
68 *
69 * The general rule of thumb is that instances of this type should always be used
70 * implicitly and there shouldn't be any signatures mentioning this type,
71 * whether explicitly (e.g. function signature) or implicitly (e.g. inferred `val` type).
72 */
73 public sealed interface SelectBuilder<in R> {
74 /**
75 * Registers a clause in this [select] expression without additional parameters that does not select any value.
76 */
invokenull77 public operator fun SelectClause0.invoke(block: suspend () -> R)
78
79 /**
80 * Registers clause in this [select] expression without additional parameters that selects value of type [Q].
81 */
82 public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
83
84 /**
85 * Registers clause in this [select] expression with additional parameter of type [P] that selects value of type [Q].
86 */
87 public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
88
89 /**
90 * Registers clause in this [select] expression with additional nullable parameter of type [P]
91 * with the `null` value for this parameter that selects value of type [Q].
92 */
93 public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R): Unit = invoke(null, block)
94
95 /**
96 * Clause that selects the given [block] after a specified timeout passes.
97 * If timeout is negative or zero, [block] is selected immediately.
98 *
99 * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future.
100 *
101 * @param timeMillis timeout time in milliseconds.
102 */
103 @ExperimentalCoroutinesApi
104 @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
105 @LowPriorityInOverloadResolution
106 @Deprecated(
107 message = "Replaced with the same extension function",
108 level = DeprecationLevel.ERROR, replaceWith = ReplaceWith(expression = "onTimeout", imports = ["kotlinx.coroutines.selects.onTimeout"])
109 ) // Since 1.7.0, was experimental
110 public fun onTimeout(timeMillis: Long, block: suspend () -> R): Unit = onTimeout(timeMillis, block)
111 }
112
113 /**
114 * Each [select] clause is specified with:
115 * 1) the [object of this clause][clauseObject],
116 * such as the channel instance for [SendChannel.onSend];
117 * 2) the function that specifies how this clause
118 * should be registered in the object above;
119 * 3) the function that modifies the internal result
120 * (passed via [SelectInstance.trySelect] or
121 * [SelectInstance.selectInRegistrationPhase])
122 * to the argument of the user-specified block.
123 * 4) the function that specifies how the internal result provided via
124 * [SelectInstance.trySelect] or [SelectInstance.selectInRegistrationPhase]
125 * should be processed in case of this `select` cancellation while dispatching.
126 */
127 @InternalCoroutinesApi
128 public sealed interface SelectClause {
129 public val clauseObject: Any
130 public val regFunc: RegistrationFunction
131 public val processResFunc: ProcessResultFunction
132 public val onCancellationConstructor: OnCancellationConstructor?
133 }
134
135 /**
136 * The registration function specifies how the `select` instance should be registered into
137 * the specified clause object. In case of channels, the registration logic
138 * coincides with the plain `send/receive` operation with the only difference that
139 * the `select` instance is stored as a waiter instead of continuation.
140 */
141 @InternalCoroutinesApi
142 public typealias RegistrationFunction = (clauseObject: Any, select: SelectInstance<*>, param: Any?) -> Unit
143
144 /**
145 * This function specifies how the _internal_ result, provided via [SelectInstance.selectInRegistrationPhase]
146 * or [SelectInstance.trySelect] should be processed. For example, both [ReceiveChannel.onReceive] and
147 * [ReceiveChannel.onReceiveCatching] clauses perform exactly the same synchronization logic,
148 * but differ when the channel has been discovered in the closed or cancelled state.
149 */
150 @InternalCoroutinesApi
151 public typealias ProcessResultFunction = (clauseObject: Any, param: Any?, clauseResult: Any?) -> Any?
152
153 /**
154 * This function specifies how the internal result, provided via [SelectInstance.trySelect]
155 * or [SelectInstance.selectInRegistrationPhase], should be processed in case of this `select`
156 * cancellation while dispatching. Unfortunately, we cannot pass this function only in [SelectInstance.trySelect],
157 * as [SelectInstance.selectInRegistrationPhase] can be called when the coroutine is already cancelled.
158 */
159 @InternalCoroutinesApi
160 public typealias OnCancellationConstructor = (select: SelectInstance<*>, param: Any?, internalResult: Any?) -> (Throwable) -> Unit
161
162 /**
163 * Clause for [select] expression without additional parameters that does not select any value.
164 */
165 public sealed interface SelectClause0 : SelectClause
166
167 internal class SelectClause0Impl(
168 override val clauseObject: Any,
169 override val regFunc: RegistrationFunction,
170 override val onCancellationConstructor: OnCancellationConstructor? = null
171 ) : SelectClause0 {
172 override val processResFunc: ProcessResultFunction = DUMMY_PROCESS_RESULT_FUNCTION
173 }
_null174 private val DUMMY_PROCESS_RESULT_FUNCTION: ProcessResultFunction = { _, _, _ -> null }
175
176 /**
177 * Clause for [select] expression without additional parameters that selects value of type [Q].
178 */
179 public sealed interface SelectClause1<out Q> : SelectClause
180
181 internal class SelectClause1Impl<Q>(
182 override val clauseObject: Any,
183 override val regFunc: RegistrationFunction,
184 override val processResFunc: ProcessResultFunction,
185 override val onCancellationConstructor: OnCancellationConstructor? = null
186 ) : SelectClause1<Q>
187
188 /**
189 * Clause for [select] expression with additional parameter of type [P] that selects value of type [Q].
190 */
191 public sealed interface SelectClause2<in P, out Q> : SelectClause
192
193 internal class SelectClause2Impl<P, Q>(
194 override val clauseObject: Any,
195 override val regFunc: RegistrationFunction,
196 override val processResFunc: ProcessResultFunction,
197 override val onCancellationConstructor: OnCancellationConstructor? = null
198 ) : SelectClause2<P, Q>
199
200 /**
201 * Internal representation of `select` instance.
202 *
203 * @suppress **This is unstable API, and it is subject to change.**
204 */
205 @InternalCoroutinesApi
206 public sealed interface SelectInstance<in R> {
207 /**
208 * The context of the coroutine that is performing this `select` operation.
209 */
210 public val context: CoroutineContext
211
212 /**
213 * This function should be called by other operations,
214 * which are trying to perform a rendezvous with this `select`.
215 * Returns `true` if the rendezvous succeeds, `false` otherwise.
216 *
217 * Note that according to the current implementation, a rendezvous attempt can fail
218 * when either another clause is already selected or this `select` is still in
219 * REGISTRATION phase. To distinguish the reasons, [SelectImplementation.trySelectDetailed]
220 * function can be used instead.
221 */
trySelectnull222 public fun trySelect(clauseObject: Any, result: Any?): Boolean
223
224 /**
225 * When this `select` instance is stored as a waiter, the specified [handle][disposableHandle]
226 * defines how the stored `select` should be removed in case of cancellation or another clause selection.
227 */
228 public fun disposeOnCompletion(disposableHandle: DisposableHandle)
229
230 /**
231 * When a clause becomes selected during registration, the corresponding internal result
232 * (which is further passed to the clause's [ProcessResultFunction]) should be provided
233 * via this function. After that, other clause registrations are ignored and [trySelect] fails.
234 */
235 public fun selectInRegistrationPhase(internalResult: Any?)
236 }
237 internal interface SelectInstanceInternal<R>: SelectInstance<R>, Waiter
238
239 @PublishedApi
240 internal open class SelectImplementation<R>(
241 override val context: CoroutineContext
242 ) : CancelHandler, SelectBuilder<R>, SelectInstanceInternal<R> {
243
244 /**
245 * Essentially, the `select` operation is split into three phases: REGISTRATION, WAITING, and COMPLETION.
246 *
247 * == Phase 1: REGISTRATION ==
248 * In the first REGISTRATION phase, the user-specified [SelectBuilder] is applied, and all the listed clauses
249 * are registered via the provided [registration functions][SelectClause.regFunc]. Intuitively, `select` clause
250 * registration is similar to the plain blocking operation, with the only difference that this [SelectInstance]
251 * is stored as a waiter instead of continuation, and [SelectInstance.trySelect] is used to make a rendezvous.
252 * Also, when registering, it is possible for the operation to complete immediately, without waiting. In this case,
253 * [SelectInstance.selectInRegistrationPhase] should be used. Otherwise, when no rendezvous happens and this `select`
254 * instance is stored as a waiter, a completion handler for the registering clause should be specified via
255 * [SelectInstance.disposeOnCompletion]; this handler specifies how to remove this `select` instance from the
256 * clause object when another clause becomes selected or the operation cancels.
257 *
258 * After a clause registration is completed, another coroutine can attempt to make a rendezvous with this `select`.
259 * However, to resolve a race between clauses registration and [SelectInstance.trySelect], the latter fails when
260 * this `select` is still in REGISTRATION phase. Thus, the corresponding clause has to be registered again.
261 *
262 * In this phase, the `state` field stores either a special [STATE_REG] marker or
263 * a list of clauses to be re-registered due to failed rendezvous attempts.
264 *
265 * == Phase 2: WAITING ==
266 * If no rendezvous happens in REGISTRATION phase, the `select` operation moves to WAITING one and suspends until
267 * [SelectInstance.trySelect] is called. Also, when waiting, this `select` can be cancelled. In the latter case,
268 * further [SelectInstance.trySelect] attempts fail, and all the completion handlers, specified via
269 * [SelectInstance.disposeOnCompletion], are invoked to remove this `select` instance from the corresponding
270 * clause objects.
271 *
272 * In this phase, the `state` field stores either the continuation to be later resumed or a special `Cancelled`
273 * object (with the cancellation cause inside) when this `select` becomes cancelled.
274 *
275 * == Phase 3: COMPLETION ==
276 * Once a rendezvous happens either in REGISTRATION phase (via [SelectInstance.selectInRegistrationPhase]) or
277 * in WAITING phase (via [SelectInstance.trySelect]), this `select` moves to the final `COMPLETION` phase.
278 * First, the provided internal result is processed via the [ProcessResultFunction] of the selected clause;
279 * it returns the argument for the user-specified block or throws an exception (see [SendChannel.onSend] as
280 * an example). After that, this `select` should be removed from all other clause objects by calling the
281 * corresponding [DisposableHandle]-s, provided via [SelectInstance.disposeOnCompletion] during registration.
282 * At the end, the user-specified block is called and this `select` finishes.
283 *
284 * In this phase, once a rendezvous is happened, the `state` field stores the corresponding clause.
285 * After that, it moves to [STATE_COMPLETED] to avoid memory leaks.
286 *
287 *
288 *
289 * The state machine is listed below:
290 *
291 * REGISTRATION PHASE WAITING PHASE COMPLETION PHASE
292 * ⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢ ⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢ ⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢⌢
293 *
294 * +-----------+ +-----------+
295 * | CANCELLED | | COMPLETED |
296 * +-----------+ +-----------+
297 * ^ ^
298 * INITIAL STATE | | this `select`
299 * ------------+ | cancelled | is completed
300 * \ | |
301 * +=============+ move to +------+ successful +------------+
302 * +--| STATE_REG |---------------> | cont |-----------------| ClauseData |
303 * | +=============+ WAITING phase +------+ trySelect(..) +------------+
304 * | ^ | ^
305 * | | | some clause has been selected during registration |
306 * add a | | +-------------------------------------------------------+
307 * clause to be | | |
308 * re-registered | | re-register some clause has been selected |
309 * | | clauses during registration while there |
310 * v | are clauses to be re-registered; |
311 * +------------------+ ignore the latter |
312 * +--| List<ClauseData> |-----------------------------------------------------+
313 * | +------------------+
314 * | ^
315 * | | add one more clause
316 * | | for re-registration
317 * +------------+
318 *
319 * One of the most valuable benefits of this `select` design is that it allows processing clauses
320 * in a way similar to plain operations, such as `send` or `receive` on channels. The only difference
321 * is that instead of continuation, the operation should store the provided `select` instance object.
322 * Thus, this design makes it possible to support the `select` expression for any blocking data structure
323 * in Kotlin Coroutines.
324 *
325 * It is worth mentioning that the algorithm above provides "obstruction-freedom" non-blocking guarantee
326 * instead of the standard "lock-freedom" to avoid using heavy descriptors. In practice, this relaxation
327 * does not make significant difference. However, it is vital for Kotlin Coroutines to provide some
328 * non-blocking guarantee, as users may add blocking code in [SelectBuilder], and this blocking code
329 * should not cause blocking behaviour in other places, such as an attempt to make a rendezvous with
330 * the `select` that is hang in REGISTRATION phase.
331 *
332 * Also, this implementation is NOT linearizable under some circumstances. The reason is that a rendezvous
333 * attempt with `select` (via [SelectInstance.trySelect]) may fail when this `select` operation is still
334 * in REGISTRATION phase. Consider the following situation on two empty rendezvous channels `c1` and `c2`
335 * and the `select` operation that tries to send an element to one of these channels. First, this `select`
336 * instance is registered as a waiter in `c1`. After that, another thread can observe that `c1` is no longer
337 * empty and try to receive an element from `c1` -- this receive attempt fails due to the `select` operation
338 * being in REGISTRATION phase.
339 * It is also possible to observe that this `select` operation registered in `c2` first, and only after that in
340 * `c1` (it has to re-register in `c1` after the unsuccessful rendezvous attempt), which is also non-linearizable.
341 * We, however, find such a non-linearizable behaviour not so important in practice and leverage the correctness
342 * relaxation for the algorithm simplicity and the non-blocking progress guarantee.
343 */
344
345 /**
346 * The state of this `select` operation. See the description above for details.
347 */
348 private val state = atomic<Any>(STATE_REG)
349 /**
350 * Returns `true` if this `select` instance is in the REGISTRATION phase;
351 * otherwise, returns `false`.
352 */
353 private val inRegistrationPhase
354 get() = state.value.let {
355 it === STATE_REG || it is List<*>
356 }
357 /**
358 * Returns `true` if this `select` is already selected;
359 * thus, other parties are bound to fail when making a rendezvous with it.
360 */
361 private val isSelected
362 get() = state.value is SelectImplementation<*>.ClauseData
363 /**
364 * Returns `true` if this `select` is cancelled.
365 */
366 private val isCancelled
367 get() = state.value === STATE_CANCELLED
368
369 /**
370 * List of clauses waiting on this `select` instance.
371 *
372 * This property is the subject to bening data race: concurrent cancellation might null-out this property
373 * while [trySelect] operation reads it and iterates over its content.
374 * A logical race is resolved by the consensus on [state] property.
375 */
376 @BenignDataRace
377 private var clauses: MutableList<ClauseData>? = ArrayList(2)
378
379 /**
380 * Stores the completion action provided through [disposeOnCompletion] or [invokeOnCancellation]
381 * during clause registration. After that, if the clause is successfully registered
382 * (so, it has not completed immediately), this handler is stored into
383 * the corresponding [ClauseData] instance.
384 *
385 * Note that either [DisposableHandle] is provided, or a [Segment] instance with
386 * the index in it, which specify the location of storing this `select`.
387 * In the latter case, [Segment.onCancellation] should be called on completion/cancellation.
388 */
389 private var disposableHandleOrSegment: Any? = null
390
391 /**
392 * In case the disposable handle is specified via [Segment]
393 * and index in it, implying calling [Segment.onCancellation],
394 * the corresponding index is stored in this field.
395 * The segment is stored in [disposableHandleOrSegment].
396 */
397 private var indexInSegment: Int = -1
398
399 /**
400 * Stores the result passed via [selectInRegistrationPhase] during clause registration
401 * or [trySelect], which is called by another coroutine trying to make a rendezvous
402 * with this `select` instance. Further, this result is processed via the
403 * [ProcessResultFunction] of the selected clause.
404 *
405 * Unfortunately, we cannot store the result in the [state] field, as the latter stores
406 * the clause object upon selection (see [ClauseData.clauseObject] and [SelectClause.clauseObject]).
407 * Instead, it is possible to merge the [internalResult] and [disposableHandle] fields into
408 * one that stores either result when the clause is successfully registered ([inRegistrationPhase] is `true`),
409 * or [DisposableHandle] instance when the clause is completed during registration ([inRegistrationPhase] is `false`).
410 * Yet, this optimization is omitted for code simplicity.
411 *
412 * This property is the subject to benign data race:
413 * [Cleanup][cleanup] procedure can be invoked both as part of the completion sequence
414 * and as a cancellation handler triggered by an external cancellation.
415 * In both scenarios, [NO_RESULT] is written to this property via race.
416 */
417 @BenignDataRace
418 private var internalResult: Any? = NO_RESULT
419
420 /**
421 * This function is called after the [SelectBuilder] is applied. In case one of the clauses is already selected,
422 * the algorithm applies the corresponding [ProcessResultFunction] and invokes the user-specified [block][ClauseData.block].
423 * Otherwise, it moves this `select` to WAITING phase (re-registering clauses if needed), suspends until a rendezvous
424 * is happened, and then completes the operation by applying the corresponding [ProcessResultFunction] and
425 * invoking the user-specified [block][ClauseData.block].
426 */
427 @PublishedApi
428 internal open suspend fun doSelect(): R =
429 if (isSelected) complete() // Fast path
430 else doSelectSuspend() // Slow path
431
432 // We separate the following logic as it has two suspension points
433 // and, therefore, breaks the tail-call optimization if it were
434 // inlined in [doSelect]
435 private suspend fun doSelectSuspend(): R {
436 // In case no clause has been selected during registration,
437 // the `select` operation suspends and waits for a rendezvous.
438 waitUntilSelected() // <-- suspend call => no tail-call optimization here
439 // There is a selected clause! Apply the corresponding
440 // [ProcessResultFunction] and invoke the user-specified block.
441 return complete() // <-- one more suspend call
442 }
443
444 // ========================
445 // = CLAUSES REGISTRATION =
446 // ========================
447
448 override fun SelectClause0.invoke(block: suspend () -> R) =
449 ClauseData(clauseObject, regFunc, processResFunc, PARAM_CLAUSE_0, block, onCancellationConstructor).register()
450 override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) =
451 ClauseData(clauseObject, regFunc, processResFunc, null, block, onCancellationConstructor).register()
452 override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) =
453 ClauseData(clauseObject, regFunc, processResFunc, param, block, onCancellationConstructor).register()
454
455 /**
456 * Attempts to register this `select` clause. If another clause is already selected,
457 * this function does nothing and completes immediately.
458 * Otherwise, it registers this `select` instance in
459 * the [clause object][ClauseData.clauseObject]
460 * according to the provided [registration function][ClauseData.regFunc].
461 * On success, this `select` instance is stored as a waiter
462 * in the clause object -- the algorithm also stores
463 * the provided via [disposeOnCompletion] completion action
464 * and adds the clause to the list of registered one.
465 * In case of registration failure, the internal result
466 * (not processed by [ProcessResultFunction] yet) must be
467 * provided via [selectInRegistrationPhase] -- the algorithm
468 * updates the state to this clause reference.
469 */
470 @JvmName("register")
471 internal fun ClauseData.register(reregister: Boolean = false) {
472 assert { state.value !== STATE_CANCELLED }
473 // Is there already selected clause?
474 if (state.value.let { it is SelectImplementation<*>.ClauseData }) return
475 // For new clauses, check that there does not exist
476 // another clause with the same object.
477 if (!reregister) checkClauseObject(clauseObject)
478 // Try to register in the corresponding object.
479 if (tryRegisterAsWaiter(this@SelectImplementation)) {
480 // Successfully registered, and this `select` instance
481 // is stored as a waiter. Add this clause to the list
482 // of registered clauses and store the provided via
483 // [invokeOnCompletion] completion action into the clause.
484 //
485 // Importantly, the [waitUntilSelected] function is implemented
486 // carefully to ensure that the cancellation handler has not been
487 // installed when clauses re-register, so the logic below cannot
488 // be invoked concurrently with the clean-up procedure.
489 // This also guarantees that the list of clauses cannot be cleared
490 // in the registration phase, so it is safe to read it with "!!".
491 if (!reregister) clauses!! += this
492 disposableHandleOrSegment = this@SelectImplementation.disposableHandleOrSegment
493 indexInSegment = this@SelectImplementation.indexInSegment
494 this@SelectImplementation.disposableHandleOrSegment = null
495 this@SelectImplementation.indexInSegment = -1
496 } else {
497 // This clause has been selected!
498 // Update the state correspondingly.
499 state.value = this
500 }
501 }
502
503 /**
504 * Checks that there does not exist another clause with the same object.
505 */
506 private fun checkClauseObject(clauseObject: Any) {
507 // Read the list of clauses, it is guaranteed that it is non-null.
508 // In fact, it can become `null` only in the clean-up phase, while
509 // this check can be called only in the registration one.
510 val clauses = clauses!!
511 // Check that there does not exist another clause with the same object.
512 check(clauses.none { it.clauseObject === clauseObject }) {
513 "Cannot use select clauses on the same object: $clauseObject"
514 }
515 }
516
517 override fun disposeOnCompletion(disposableHandle: DisposableHandle) {
518 this.disposableHandleOrSegment = disposableHandle
519 }
520
521 /**
522 * An optimized version for the code below that does not allocate
523 * a cancellation handler object and efficiently stores the specified
524 * [segment] and [index].
525 *
526 * ```
527 * disposeOnCompletion {
528 * segment.onCancellation(index, null)
529 * }
530 * ```
531 */
532 override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
533 this.disposableHandleOrSegment = segment
534 this.indexInSegment = index
535 }
536
537 override fun selectInRegistrationPhase(internalResult: Any?) {
538 this.internalResult = internalResult
539 }
540
541 // =========================
542 // = WAITING FOR SELECTION =
543 // =========================
544
545 /**
546 * Suspends and waits until some clause is selected. However, it is possible for a concurrent
547 * coroutine to invoke [trySelect] while this `select` is still in REGISTRATION phase.
548 * In this case, [trySelect] marks the corresponding select clause to be re-registered, and
549 * this function performs registration of such clauses. After that, it atomically stores
550 * the continuation into the [state] field if there is no more clause to be re-registered.
551 */
552 private suspend fun waitUntilSelected() = suspendCancellableCoroutine<Unit> sc@ { cont ->
553 // Update the state.
554 state.loop { curState ->
555 when {
556 // This `select` is in REGISTRATION phase, and there is no clause to be re-registered.
557 // Perform a transition to WAITING phase by storing the current continuation.
558 curState === STATE_REG -> if (state.compareAndSet(curState, cont)) {
559 // Perform a clean-up in case of cancellation.
560 //
561 // Importantly, we MUST install the cancellation handler
562 // only when the algorithm is bound to suspend. Otherwise,
563 // a race with [tryRegister] is possible, and the provided
564 // via [disposeOnCompletion] cancellation action can be ignored.
565 // Also, we MUST guarantee that this dispose handle is _visible_
566 // according to the memory model, and we CAN guarantee this when
567 // the state is updated.
568 cont.invokeOnCancellation(this)
569 return@sc
570 }
571 // This `select` is in REGISTRATION phase, but there are clauses that has to be registered again.
572 // Perform the required registrations and try again.
573 curState is List<*> -> if (state.compareAndSet(curState, STATE_REG)) {
574 @Suppress("UNCHECKED_CAST")
575 curState as List<Any>
576 curState.forEach { reregisterClause(it) }
577 }
578 // This `select` operation became completed during clauses re-registration.
579 curState is SelectImplementation<*>.ClauseData -> {
580 cont.resume(Unit, curState.createOnCancellationAction(this, internalResult))
581 return@sc
582 }
583 // This `select` cannot be in any other state.
584 else -> error("unexpected state: $curState")
585 }
586 }
587 }
588
589 /**
590 * Re-registers the clause with the specified
591 * [clause object][clauseObject] after unsuccessful
592 * [trySelect] of this clause while the `select`
593 * was still in REGISTRATION phase.
594 */
595 private fun reregisterClause(clauseObject: Any) {
596 val clause = findClause(clauseObject)!! // it is guaranteed that the corresponding clause is presented
597 clause.disposableHandleOrSegment = null
598 clause.indexInSegment = -1
599 clause.register(reregister = true)
600 }
601
602 // ==============
603 // = RENDEZVOUS =
604 // ==============
605
606 override fun trySelect(clauseObject: Any, result: Any?): Boolean =
607 trySelectInternal(clauseObject, result) == TRY_SELECT_SUCCESSFUL
608
609 /**
610 * Similar to [trySelect] but provides a failure reason
611 * if this rendezvous is unsuccessful. We need this function
612 * in the channel implementation.
613 */
614 fun trySelectDetailed(clauseObject: Any, result: Any?) =
615 TrySelectDetailedResult(trySelectInternal(clauseObject, result))
616
617 private fun trySelectInternal(clauseObject: Any, internalResult: Any?): Int {
618 while (true) {
619 when (val curState = state.value) {
620 // Perform a rendezvous with this select if it is in WAITING state.
621 is CancellableContinuation<*> -> {
622 val clause = findClause(clauseObject) ?: continue // retry if `clauses` is already `null`
623 val onCancellation = clause.createOnCancellationAction(this@SelectImplementation, internalResult)
624 if (state.compareAndSet(curState, clause)) {
625 @Suppress("UNCHECKED_CAST")
626 val cont = curState as CancellableContinuation<Unit>
627 // Success! Store the resumption value and
628 // try to resume the continuation.
629 this.internalResult = internalResult
630 if (cont.tryResume(onCancellation)) return TRY_SELECT_SUCCESSFUL
631 // If the resumption failed, we need to clean the [result] field to avoid memory leaks.
632 this.internalResult = NO_RESULT
633 return TRY_SELECT_CANCELLED
634 }
635 }
636 // Already selected.
637 STATE_COMPLETED, is SelectImplementation<*>.ClauseData -> return TRY_SELECT_ALREADY_SELECTED
638 // Already cancelled.
639 STATE_CANCELLED -> return TRY_SELECT_CANCELLED
640 // This select is still in REGISTRATION phase, re-register the clause
641 // in order not to wait until this select moves to WAITING phase.
642 // This is a rare race, so we do not need to worry about performance here.
643 STATE_REG -> if (state.compareAndSet(curState, listOf(clauseObject))) return TRY_SELECT_REREGISTER
644 // This select is still in REGISTRATION phase, and the state stores a list of clauses
645 // for re-registration, add the selecting clause to this list.
646 // This is a rare race, so we do not need to worry about performance here.
647 is List<*> -> if (state.compareAndSet(curState, curState + clauseObject)) return TRY_SELECT_REREGISTER
648 // Another state? Something went really wrong.
649 else -> error("Unexpected state: $curState")
650 }
651 }
652 }
653
654 /**
655 * Finds the clause with the corresponding [clause object][SelectClause.clauseObject].
656 * If the reference to the list of clauses is already cleared due to completion/cancellation,
657 * this function returns `null`
658 */
659 private fun findClause(clauseObject: Any): ClauseData? {
660 // Read the list of clauses. If the `clauses` field is already `null`,
661 // the clean-up phase has already completed, and this function returns `null`.
662 val clauses = this.clauses ?: return null
663 // Find the clause with the specified clause object.
664 return clauses.find { it.clauseObject === clauseObject }
665 ?: error("Clause with object $clauseObject is not found")
666 }
667
668 // ==============
669 // = COMPLETION =
670 // ==============
671
672 /**
673 * Completes this `select` operation after the internal result is provided
674 * via [SelectInstance.trySelect] or [SelectInstance.selectInRegistrationPhase].
675 * (1) First, this function applies the [ProcessResultFunction] of the selected clause
676 * to the internal result.
677 * (2) After that, the [clean-up procedure][cleanup]
678 * is called to remove this `select` instance from other clause objects, and
679 * make it possible to collect it by GC after this `select` finishes.
680 * (3) Finally, the user-specified block is invoked
681 * with the processed result as an argument.
682 */
683 private suspend fun complete(): R {
684 assert { isSelected }
685 // Get the selected clause.
686 @Suppress("UNCHECKED_CAST")
687 val selectedClause = state.value as SelectImplementation<R>.ClauseData
688 // Perform the clean-up before the internal result processing and
689 // the user-specified block invocation to guarantee the absence
690 // of memory leaks. Collect the internal result before that.
691 val internalResult = this.internalResult
692 cleanup(selectedClause)
693 // Process the internal result and invoke the user's block.
694 return if (!RECOVER_STACK_TRACES) {
695 // TAIL-CALL OPTIMIZATION: the `suspend` block
696 // is invoked at the very end.
697 val blockArgument = selectedClause.processResult(internalResult)
698 selectedClause.invokeBlock(blockArgument)
699 } else {
700 // TAIL-CALL OPTIMIZATION: the `suspend`
701 // function is invoked at the very end.
702 // However, internally this `suspend` function
703 // constructs a state machine to recover a
704 // possible stack-trace.
705 processResultAndInvokeBlockRecoveringException(selectedClause, internalResult)
706 }
707 }
708
709 private suspend fun processResultAndInvokeBlockRecoveringException(clause: ClauseData, internalResult: Any?): R =
710 try {
711 val blockArgument = clause.processResult(internalResult)
712 clause.invokeBlock(blockArgument)
713 } catch (e: Throwable) {
714 // In the debug mode, we need to properly recover
715 // the stack-trace of the exception; the tail-call
716 // optimization cannot be applied here.
717 recoverAndThrow(e)
718 }
719
720 /**
721 * Invokes all [DisposableHandle]-s provided via
722 * [SelectInstance.disposeOnCompletion] during
723 * clause registrations.
724 */
725 private fun cleanup(selectedClause: ClauseData) {
726 assert { state.value == selectedClause }
727 // Read the list of clauses. If the `clauses` field is already `null`,
728 // a concurrent clean-up procedure has already completed, and it is safe to finish.
729 val clauses = this.clauses ?: return
730 // Invoke all cancellation handlers except for the
731 // one related to the selected clause, if specified.
732 clauses.forEach { clause ->
733 if (clause !== selectedClause) clause.dispose()
734 }
735 // We do need to clean all the data to avoid memory leaks.
736 this.state.value = STATE_COMPLETED
737 this.internalResult = NO_RESULT
738 this.clauses = null
739 }
740
741 // [CompletionHandler] implementation, must be invoked on cancellation.
742 override fun invoke(cause: Throwable?) {
743 // Update the state.
744 state.update { cur ->
745 // Finish immediately when this `select` is already completed.
746 // Notably, this select might be logically completed
747 // (the `state` field stores the selected `ClauseData`),
748 // while the continuation is already cancelled.
749 // We need to invoke the cancellation handler in this case.
750 if (cur === STATE_COMPLETED) return
751 STATE_CANCELLED
752 }
753 // Read the list of clauses. If the `clauses` field is already `null`,
754 // a concurrent clean-up procedure has already completed, and it is safe to finish.
755 val clauses = this.clauses ?: return
756 // Remove this `select` instance from all the clause object (channels, mutexes, etc.).
757 clauses.forEach { it.dispose() }
758 // We do need to clean all the data to avoid memory leaks.
759 this.internalResult = NO_RESULT
760 this.clauses = null
761 }
762
763 /**
764 * Each `select` clause is internally represented with a [ClauseData] instance.
765 */
766 internal inner class ClauseData(
767 @JvmField val clauseObject: Any, // the object of this `select` clause: Channel, Mutex, Job, ...
768 private val regFunc: RegistrationFunction,
769 private val processResFunc: ProcessResultFunction,
770 private val param: Any?, // the user-specified param
771 private val block: Any, // the user-specified block, which should be called if this clause becomes selected
772 @JvmField val onCancellationConstructor: OnCancellationConstructor?
773 ) {
774 @JvmField var disposableHandleOrSegment: Any? = null
775 @JvmField var indexInSegment: Int = -1
776
777 /**
778 * Tries to register the specified [select] instance in [clauseObject] and check
779 * whether the registration succeeded or a rendezvous has happened during the registration.
780 * This function returns `true` if this [select] is successfully registered and
781 * is _waiting_ for a rendezvous, or `false` when this clause becomes
782 * selected during registration.
783 *
784 * For example, the [Channel.onReceive] clause registration
785 * on a non-empty channel retrieves the first element and completes
786 * the corresponding [select] via [SelectInstance.selectInRegistrationPhase].
787 */
788 fun tryRegisterAsWaiter(select: SelectImplementation<R>): Boolean {
789 assert { select.inRegistrationPhase || select.isCancelled }
790 assert { select.internalResult === NO_RESULT }
791 regFunc(clauseObject, select, param)
792 return select.internalResult === NO_RESULT
793 }
794
795 /**
796 * Processes the internal result provided via either
797 * [SelectInstance.selectInRegistrationPhase] or
798 * [SelectInstance.trySelect] and returns an argument
799 * for the user-specified [block].
800 *
801 * Importantly, this function may throw an exception
802 * (e.g., when the channel is closed in [Channel.onSend], the
803 * corresponding [ProcessResultFunction] is bound to fail).
804 */
805 fun processResult(result: Any?) = processResFunc(clauseObject, param, result)
806
807 /**
808 * Invokes the user-specified block and returns
809 * the final result of this `select` clause.
810 */
811 @Suppress("UNCHECKED_CAST")
812 suspend fun invokeBlock(argument: Any?): R {
813 val block = block
814 // We distinguish no-argument and 1-argument
815 // lambdas via special markers for the clause
816 // parameters. Specifically, PARAM_CLAUSE_0
817 // is always used with [SelectClause0], which
818 // takes a no-argument lambda.
819 //
820 // TAIL-CALL OPTIMIZATION: we invoke
821 // the `suspend` block at the very end.
822 return if (this.param === PARAM_CLAUSE_0) {
823 block as suspend () -> R
824 block()
825 } else {
826 block as suspend (Any?) -> R
827 block(argument)
828 }
829 }
830
831 fun dispose() {
832 with(disposableHandleOrSegment) {
833 if (this is Segment<*>) {
834 this.onCancellation(indexInSegment, null, context)
835 } else {
836 (this as? DisposableHandle)?.dispose()
837 }
838 }
839 }
840
841 fun createOnCancellationAction(select: SelectInstance<*>, internalResult: Any?) =
842 onCancellationConstructor?.invoke(select, param, internalResult)
843 }
844 }
845
tryResumenull846 private fun CancellableContinuation<Unit>.tryResume(onCancellation: ((cause: Throwable) -> Unit)?): Boolean {
847 val token = tryResume(Unit, null, onCancellation) ?: return false
848 completeResume(token)
849 return true
850 }
851
852 // trySelectInternal(..) results.
853 private const val TRY_SELECT_SUCCESSFUL = 0
854 private const val TRY_SELECT_REREGISTER = 1
855 private const val TRY_SELECT_CANCELLED = 2
856 private const val TRY_SELECT_ALREADY_SELECTED = 3
857 // trySelectDetailed(..) results.
858 internal enum class TrySelectDetailedResult {
859 SUCCESSFUL, REREGISTER, CANCELLED, ALREADY_SELECTED
860 }
TrySelectDetailedResultnull861 private fun TrySelectDetailedResult(trySelectInternalResult: Int): TrySelectDetailedResult = when(trySelectInternalResult) {
862 TRY_SELECT_SUCCESSFUL -> SUCCESSFUL
863 TRY_SELECT_REREGISTER -> REREGISTER
864 TRY_SELECT_CANCELLED -> CANCELLED
865 TRY_SELECT_ALREADY_SELECTED -> ALREADY_SELECTED
866 else -> error("Unexpected internal result: $trySelectInternalResult")
867 }
868
869 // Markers for REGISTRATION, COMPLETED, and CANCELLED states.
870 private val STATE_REG = Symbol("STATE_REG")
871 private val STATE_COMPLETED = Symbol("STATE_COMPLETED")
872 private val STATE_CANCELLED = Symbol("STATE_CANCELLED")
873 // As the selection result is nullable, we use this special
874 // marker for the absence of result.
875 private val NO_RESULT = Symbol("NO_RESULT")
876 // We use this marker parameter objects to distinguish
877 // SelectClause[0,1,2] and invoke the user-specified block correctly.
878 internal val PARAM_CLAUSE_0 = Symbol("PARAM_CLAUSE_0")
879