<lambda>null1 package kotlinx.coroutines
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.internal.*
5 import kotlin.coroutines.*
6 import kotlin.coroutines.intrinsics.*
7 import kotlin.jvm.*
8
9 private const val UNDECIDED = 0
10 private const val SUSPENDED = 1
11 private const val RESUMED = 2
12
13 private const val DECISION_SHIFT = 29
14 private const val INDEX_MASK = (1 shl DECISION_SHIFT) - 1
15 private const val NO_INDEX = INDEX_MASK
16
17 private inline val Int.decision get() = this shr DECISION_SHIFT
18 private inline val Int.index get() = this and INDEX_MASK
19 @Suppress("NOTHING_TO_INLINE")
20 private inline fun decisionAndIndex(decision: Int, index: Int) = (decision shl DECISION_SHIFT) + index
21
22 @JvmField
23 internal val RESUME_TOKEN = Symbol("RESUME_TOKEN")
24
25 /**
26 * @suppress **This is unstable API and it is subject to change.**
27 */
28 @PublishedApi
29 internal open class CancellableContinuationImpl<in T>(
30 final override val delegate: Continuation<T>,
31 resumeMode: Int
32 ) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame, Waiter {
33 init {
34 assert { resumeMode != MODE_UNINITIALIZED } // invalid mode for CancellableContinuationImpl
35 }
36
37 public override val context: CoroutineContext = delegate.context
38
39 /*
40 * Implementation notes
41 *
42 * CancellableContinuationImpl is a subset of Job with following limitations:
43 * 1) It can have only cancellation listener (no "on cancelling")
44 * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
45 * 3) It can have at most one cancellation listener
46 * 4) Its cancellation listeners cannot be deregistered
47 * As a consequence it has much simpler state machine, more lightweight machinery and
48 * less dependencies.
49 */
50
51 /** decision state machine
52
53 +-----------+ trySuspend +-----------+
54 | UNDECIDED | -------------> | SUSPENDED |
55 +-----------+ +-----------+
56 |
57 | tryResume
58 V
59 +-----------+
60 | RESUMED |
61 +-----------+
62
63 Note: both tryResume and trySuspend can be invoked at most once, first invocation wins.
64 If the cancellation handler is specified via a [Segment] instance and the index in it
65 (so [Segment.onCancellation] should be called), the [_decisionAndIndex] field may store
66 this index additionally to the "decision" value.
67 */
68 private val _decisionAndIndex = atomic(decisionAndIndex(UNDECIDED, NO_INDEX))
69
70 /*
71 === Internal states ===
72 name state class public state description
73 ------ ------------ ------------ -----------
74 ACTIVE Active : Active active, no listeners
75 SINGLE_A CancelHandler : Active active, one cancellation listener
76 CANCELLED CancelledContinuation: Cancelled cancelled (final state)
77 COMPLETED any : Completed produced some result or threw an exception (final state)
78 */
79 private val _state = atomic<Any?>(Active)
80
81 /*
82 * This field has a concurrent rendezvous in the following scenario:
83 *
84 * - installParentHandle publishes this instance on T1
85 *
86 * T1 writes:
87 * - handle = installed; right after the installation
88 * - Shortly after: if (isComplete) handle = NonDisposableHandle
89 *
90 * Any other T writes if the parent job is cancelled in detachChild:
91 * - handle = NonDisposableHandle
92 *
93 * We want to preserve a strict invariant on parentHandle transition, allowing only three of them:
94 * null -> anyHandle
95 * anyHandle -> NonDisposableHandle
96 * null -> NonDisposableHandle
97 *
98 * With a guarantee that after disposal the only state handle may end up in is NonDisposableHandle
99 */
100 private val _parentHandle = atomic<DisposableHandle?>(null)
101 private val parentHandle: DisposableHandle?
102 get() = _parentHandle.value
103
104 internal val state: Any? get() = _state.value
105
106 public override val isActive: Boolean get() = state is NotCompleted
107
108 public override val isCompleted: Boolean get() = state !is NotCompleted
109
110 public override val isCancelled: Boolean get() = state is CancelledContinuation
111
112 // We cannot invoke `state.toString()` since it may cause a circular dependency
113 private val stateDebugRepresentation get() = when(state) {
114 is NotCompleted -> "Active"
115 is CancelledContinuation -> "Cancelled"
116 else -> "Completed"
117 }
118
119 public override fun initCancellability() {
120 /*
121 * Invariant: at the moment of invocation, `this` has not yet
122 * leaked to user code and no one is able to invoke `resume` or `cancel`
123 * on it yet. Also, this function is not invoked for reusable continuations.
124 */
125 val handle = installParentHandle()
126 ?: return // fast path -- don't do anything without parent
127 // now check our state _after_ registering, could have completed while we were registering,
128 // but only if parent was cancelled. Parent could be in a "cancelling" state for a while,
129 // so we are helping it and cleaning the node ourselves
130 if (isCompleted) {
131 // Can be invoked concurrently in 'parentCancelled', no problems here
132 handle.dispose()
133 _parentHandle.value = NonDisposableHandle
134 }
135 }
136
137 private fun isReusable(): Boolean = resumeMode.isReusableMode && (delegate as DispatchedContinuation<*>).isReusable()
138
139 /**
140 * Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work.
141 * Invariant: used only by [suspendCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
142 */
143 @JvmName("resetStateReusable") // Prettier stack traces
144 internal fun resetStateReusable(): Boolean {
145 assert { resumeMode == MODE_CANCELLABLE_REUSABLE }
146 assert { parentHandle !== NonDisposableHandle }
147 val state = _state.value
148 assert { state !is NotCompleted }
149 if (state is CompletedContinuation && state.idempotentResume != null) {
150 // Cannot reuse continuation that was resumed with idempotent marker
151 detachChild()
152 return false
153 }
154 _decisionAndIndex.value = decisionAndIndex(UNDECIDED, NO_INDEX)
155 _state.value = Active
156 return true
157 }
158
159 public override val callerFrame: CoroutineStackFrame?
160 get() = delegate as? CoroutineStackFrame
161
162 public override fun getStackTraceElement(): StackTraceElement? = null
163
164 override fun takeState(): Any? = state
165
166 // Note: takeState does not clear the state so we don't use takenState
167 // and we use the actual current state where in CAS-loop
168 override fun cancelCompletedResult(takenState: Any?, cause: Throwable): Unit = _state.loop { state ->
169 when (state) {
170 is NotCompleted -> error("Not completed")
171 is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do
172 is CompletedContinuation -> {
173 check(!state.cancelled) { "Must be called at most once" }
174 val update = state.copy(cancelCause = cause)
175 if (_state.compareAndSet(state, update)) {
176 state.invokeHandlers(this, cause)
177 return // done
178 }
179 }
180 else -> {
181 // completed normally without marker class, promote to CompletedContinuation in case
182 // if invokeOnCancellation if called later
183 if (_state.compareAndSet(state, CompletedContinuation(state, cancelCause = cause))) {
184 return // done
185 }
186 }
187 }
188 }
189
190 /*
191 * Attempt to postpone cancellation for reusable cancellable continuation
192 */
193 private fun cancelLater(cause: Throwable): Boolean {
194 // Ensure that we are postponing cancellation to the right reusable instance
195 if (!isReusable()) return false
196 val dispatched = delegate as DispatchedContinuation<*>
197 return dispatched.postponeCancellation(cause)
198 }
199
200 public override fun cancel(cause: Throwable?): Boolean {
201 _state.loop { state ->
202 if (state !is NotCompleted) return false // false if already complete or cancelling
203 // Active -- update to final state
204 val update = CancelledContinuation(this, cause, handled = state is CancelHandler || state is Segment<*>)
205 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
206 // Invoke cancel handler if it was present
207 when (state) {
208 is CancelHandler -> callCancelHandler(state, cause)
209 is Segment<*> -> callSegmentOnCancellation(state, cause)
210 }
211 // Complete state update
212 detachChildIfNonResuable()
213 dispatchResume(resumeMode) // no need for additional cancellation checks
214 return true
215 }
216 }
217
218 internal fun parentCancelled(cause: Throwable) {
219 if (cancelLater(cause)) return
220 cancel(cause)
221 // Even if cancellation has failed, we should detach child to avoid potential leak
222 detachChildIfNonResuable()
223 }
224
225 private inline fun callCancelHandlerSafely(block: () -> Unit) {
226 try {
227 block()
228 } catch (ex: Throwable) {
229 // Handler should never fail, if it does -- it is an unhandled exception
230 handleCoroutineException(
231 context,
232 CompletionHandlerException("Exception in invokeOnCancellation handler for $this", ex)
233 )
234 }
235 }
236
237 private fun callCancelHandler(handler: InternalCompletionHandler, cause: Throwable?) =
238 /*
239 * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
240 * because we play type tricks on Kotlin/JS and handler is not necessarily a function there
241 */
242 callCancelHandlerSafely { handler.invoke(cause) }
243
244 fun callCancelHandler(handler: CancelHandler, cause: Throwable?) =
245 callCancelHandlerSafely { handler.invoke(cause) }
246
247 private fun callSegmentOnCancellation(segment: Segment<*>, cause: Throwable?) {
248 val index = _decisionAndIndex.value.index
249 check(index != NO_INDEX) { "The index for Segment.onCancellation(..) is broken" }
250 callCancelHandlerSafely { segment.onCancellation(index, cause, context) }
251 }
252
253 fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
254 try {
255 onCancellation.invoke(cause)
256 } catch (ex: Throwable) {
257 // Handler should never fail, if it does -- it is an unhandled exception
258 handleCoroutineException(
259 context,
260 CompletionHandlerException("Exception in resume onCancellation handler for $this", ex)
261 )
262 }
263 }
264
265 /**
266 * It is used when parent is cancelled to get the cancellation cause for this continuation.
267 */
268 open fun getContinuationCancellationCause(parent: Job): Throwable =
269 parent.getCancellationException()
270
271 private fun trySuspend(): Boolean {
272 _decisionAndIndex.loop { cur ->
273 when (cur.decision) {
274 UNDECIDED -> if (this._decisionAndIndex.compareAndSet(cur, decisionAndIndex(SUSPENDED, cur.index))) return true
275 RESUMED -> return false
276 else -> error("Already suspended")
277 }
278 }
279 }
280
281 private fun tryResume(): Boolean {
282 _decisionAndIndex.loop { cur ->
283 when (cur.decision) {
284 UNDECIDED -> if (this._decisionAndIndex.compareAndSet(cur, decisionAndIndex(RESUMED, cur.index))) return true
285 SUSPENDED -> return false
286 else -> error("Already resumed")
287 }
288 }
289 }
290
291 @PublishedApi
292 internal fun getResult(): Any? {
293 val isReusable = isReusable()
294 // trySuspend may fail either if 'block' has resumed/cancelled a continuation,
295 // or we got async cancellation from parent.
296 if (trySuspend()) {
297 /*
298 * Invariant: parentHandle is `null` *only* for reusable continuations.
299 * We were neither resumed nor cancelled, time to suspend.
300 * But first we have to install parent cancellation handle (if we didn't yet),
301 * so CC could be properly resumed on parent cancellation.
302 *
303 * This read has benign data-race with write of 'NonDisposableHandle'
304 * in 'detachChildIfNotReusable'.
305 */
306 if (parentHandle == null) {
307 installParentHandle()
308 }
309 /*
310 * Release the continuation after installing the handle (if needed).
311 * If we were successful, then do nothing, it's ok to reuse the instance now.
312 * Otherwise, dispose the handle by ourselves.
313 */
314 if (isReusable) {
315 releaseClaimedReusableContinuation()
316 }
317 return COROUTINE_SUSPENDED
318 }
319 // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
320 if (isReusable) {
321 // release claimed reusable continuation for the future reuse
322 releaseClaimedReusableContinuation()
323 }
324 val state = this.state
325 if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
326 // if the parent job was already cancelled, then throw the corresponding cancellation exception
327 // otherwise, there is a race if suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
328 // before the block returns. This getResult would return a result as opposed to cancellation
329 // exception that should have happened if the continuation is dispatched for execution later.
330 if (resumeMode.isCancellableMode) {
331 val job = context[Job]
332 if (job != null && !job.isActive) {
333 val cause = job.getCancellationException()
334 cancelCompletedResult(state, cause)
335 throw recoverStackTrace(cause, this)
336 }
337 }
338 return getSuccessfulResult(state)
339 }
340
341 private fun installParentHandle(): DisposableHandle? {
342 val parent = context[Job] ?: return null // don't do anything without a parent
343 // Install the handle
344 val handle = parent.invokeOnCompletion(
345 onCancelling = true,
346 handler = ChildContinuation(this)
347 )
348 _parentHandle.compareAndSet(null, handle)
349 return handle
350 }
351
352 /**
353 * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation,
354 * in which case it detaches from the parent and cancels this continuation.
355 */
356 internal fun releaseClaimedReusableContinuation() {
357 // Cannot be cast if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
358 val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return
359 detachChild()
360 cancel(cancellationCause)
361 }
362
363 override fun resumeWith(result: Result<T>) =
364 resumeImpl(result.toState(this), resumeMode)
365
366 override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
367 resumeImpl(value, resumeMode, onCancellation)
368
369 /**
370 * An optimized version for the code below that does not allocate
371 * a cancellation handler object and efficiently stores the specified
372 * [segment] and [index] in this [CancellableContinuationImpl].
373 *
374 * The only difference is that `segment.onCancellation(..)` is never
375 * called if this continuation is already completed;
376 *
377 * ```
378 * invokeOnCancellation { cause ->
379 * segment.onCancellation(index, cause)
380 * }
381 * ```
382 */
383 override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
384 _decisionAndIndex.update {
385 check(it.index == NO_INDEX) {
386 "invokeOnCancellation should be called at most once"
387 }
388 decisionAndIndex(it.decision, index)
389 }
390 invokeOnCancellationImpl(segment)
391 }
392
393 override fun invokeOnCancellation(handler: CompletionHandler) = invokeOnCancellation(CancelHandler.UserSupplied(handler))
394
395 internal fun invokeOnCancellationInternal(handler: CancelHandler) = invokeOnCancellationImpl(handler)
396
397 private fun invokeOnCancellationImpl(handler: Any) {
398 assert { handler is CancelHandler || handler is Segment<*> }
399 _state.loop { state ->
400 when (state) {
401 is Active -> {
402 if (_state.compareAndSet(state, handler)) return // quit on cas success
403 }
404 is CancelHandler, is Segment<*> -> multipleHandlersError(handler, state)
405 is CompletedExceptionally -> {
406 /*
407 * Continuation was already cancelled or completed exceptionally.
408 * NOTE: multiple invokeOnCancellation calls with different handlers are not allowed,
409 * so we check to make sure handler was installed just once.
410 */
411 if (!state.makeHandled()) multipleHandlersError(handler, state)
412 /*
413 * Call the handler only if it was cancelled (not called when completed exceptionally).
414 * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
415 * because we play type tricks on Kotlin/JS and handler is not necessarily a function there
416 */
417 if (state is CancelledContinuation) {
418 val cause: Throwable? = (state as? CompletedExceptionally)?.cause
419 if (handler is CancelHandler) {
420 callCancelHandler(handler, cause)
421 } else {
422 val segment = handler as Segment<*>
423 callSegmentOnCancellation(segment, cause)
424 }
425 }
426 return
427 }
428 is CompletedContinuation -> {
429 /*
430 * Continuation was already completed, and might already have cancel handler.
431 */
432 if (state.cancelHandler != null) multipleHandlersError(handler, state)
433 // Segment.invokeOnCancellation(..) does NOT need to be called on completed continuation.
434 if (handler is Segment<*>) return
435 handler as CancelHandler
436 if (state.cancelled) {
437 // Was already cancelled while being dispatched -- invoke the handler directly
438 callCancelHandler(handler, state.cancelCause)
439 return
440 }
441 val update = state.copy(cancelHandler = handler)
442 if (_state.compareAndSet(state, update)) return // quit on cas success
443 }
444 else -> {
445 /*
446 * Continuation was already completed normally, but might get cancelled while being dispatched.
447 * Change its state to CompletedContinuation, unless we have Segment which
448 * does not need to be called in this case.
449 */
450 if (handler is Segment<*>) return
451 handler as CancelHandler
452 val update = CompletedContinuation(state, cancelHandler = handler)
453 if (_state.compareAndSet(state, update)) return // quit on cas success
454 }
455 }
456 }
457 }
458
459 private fun multipleHandlersError(handler: Any, state: Any?) {
460 error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
461 }
462
463 private fun dispatchResume(mode: Int) {
464 if (tryResume()) return // completed before getResult invocation -- bail out
465 // otherwise, getResult has already commenced, i.e. completed later or in other thread
466 dispatch(mode)
467 }
468
469 private fun resumedState(
470 state: NotCompleted,
471 proposedUpdate: Any?,
472 resumeMode: Int,
473 onCancellation: ((cause: Throwable) -> Unit)?,
474 idempotent: Any?
475 ): Any? = when {
476 proposedUpdate is CompletedExceptionally -> {
477 assert { idempotent == null } // there are no idempotent exceptional resumes
478 assert { onCancellation == null } // only successful results can be cancelled
479 proposedUpdate
480 }
481 !resumeMode.isCancellableMode && idempotent == null -> proposedUpdate // cannot be cancelled in process, all is fine
482 onCancellation != null || state is CancelHandler || idempotent != null ->
483 // mark as CompletedContinuation if special cases are present:
484 // Cancellation handlers that shall be called after resume or idempotent resume
485 CompletedContinuation(proposedUpdate, state as? CancelHandler, onCancellation, idempotent)
486 else -> proposedUpdate // simple case -- use the value directly
487 }
488
489 private fun resumeImpl(
490 proposedUpdate: Any?,
491 resumeMode: Int,
492 onCancellation: ((cause: Throwable) -> Unit)? = null
493 ) {
494 _state.loop { state ->
495 when (state) {
496 is NotCompleted -> {
497 val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null)
498 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
499 detachChildIfNonResuable()
500 dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process
501 return // done
502 }
503 is CancelledContinuation -> {
504 /*
505 * If continuation was cancelled, then resume attempt must be ignored,
506 * because cancellation is asynchronous and may race with resume.
507 * Racy exceptions will be lost, too.
508 */
509 if (state.makeResumed()) { // check if trying to resume one (otherwise error)
510 // call onCancellation
511 onCancellation?.let { callOnCancellation(it, state.cause) }
512 return // done
513 }
514 }
515 }
516 alreadyResumedError(proposedUpdate) // otherwise, an error (second resume attempt)
517 }
518 }
519
520 /**
521 * Similar to [tryResume], but does not actually completes resume (needs [completeResume] call).
522 * Returns [RESUME_TOKEN] when resumed, `null` when it was already resumed or cancelled.
523 */
524 private fun tryResumeImpl(
525 proposedUpdate: Any?,
526 idempotent: Any?,
527 onCancellation: ((cause: Throwable) -> Unit)?
528 ): Symbol? {
529 _state.loop { state ->
530 when (state) {
531 is NotCompleted -> {
532 val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent)
533 if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
534 detachChildIfNonResuable()
535 return RESUME_TOKEN
536 }
537 is CompletedContinuation -> {
538 return if (idempotent != null && state.idempotentResume === idempotent) {
539 assert { state.result == proposedUpdate } // "Non-idempotent resume"
540 RESUME_TOKEN // resumed with the same token -- ok
541 } else {
542 null // resumed with a different token or non-idempotent -- too late
543 }
544 }
545 else -> return null // cannot resume -- not active anymore
546 }
547 }
548 }
549
550 private fun alreadyResumedError(proposedUpdate: Any?): Nothing {
551 error("Already resumed, but proposed with update $proposedUpdate")
552 }
553
554 // Unregister from parent job
555 private fun detachChildIfNonResuable() {
556 // If instance is reusable, do not detach on every reuse, #releaseInterceptedContinuation will do it for us in the end
557 if (!isReusable()) detachChild()
558 }
559
560 /**
561 * Detaches from the parent.
562 */
563 internal fun detachChild() {
564 val handle = parentHandle ?: return
565 handle.dispose()
566 _parentHandle.value = NonDisposableHandle
567 }
568
569 // Note: Always returns RESUME_TOKEN | null
570 override fun tryResume(value: T, idempotent: Any?): Any? =
571 tryResumeImpl(value, idempotent, onCancellation = null)
572
573 override fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? =
574 tryResumeImpl(value, idempotent, onCancellation)
575
576 override fun tryResumeWithException(exception: Throwable): Any? =
577 tryResumeImpl(CompletedExceptionally(exception), idempotent = null, onCancellation = null)
578
579 // note: token is always RESUME_TOKEN
580 override fun completeResume(token: Any) {
581 assert { token === RESUME_TOKEN }
582 dispatchResume(resumeMode)
583 }
584
585 override fun CoroutineDispatcher.resumeUndispatched(value: T) {
586 val dc = delegate as? DispatchedContinuation
587 resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
588 }
589
590 override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
591 val dc = delegate as? DispatchedContinuation
592 resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
593 }
594
595 @Suppress("UNCHECKED_CAST")
596 override fun <T> getSuccessfulResult(state: Any?): T =
597 when (state) {
598 is CompletedContinuation -> state.result as T
599 else -> state as T
600 }
601
602 // The exceptional state in CancellableContinuationImpl is stored directly and it is not recovered yet.
603 // The stacktrace recovery is invoked here.
604 override fun getExceptionalResult(state: Any?): Throwable? =
605 super.getExceptionalResult(state)?.let { recoverStackTrace(it, delegate) }
606
607 // For nicer debugging
608 public override fun toString(): String =
609 "${nameString()}(${delegate.toDebugString()}){$stateDebugRepresentation}@$hexAddress"
610
611 protected open fun nameString(): String =
612 "CancellableContinuation"
613
614 }
615
616 // Marker for active continuation
617 internal interface NotCompleted
618
619 private object Active : NotCompleted {
toStringnull620 override fun toString(): String = "Active"
621 }
622
623 /**
624 * Essentially the same as just a function from `Throwable?` to `Unit`.
625 * The only thing implementors can do is call [invoke].
626 * The reason this abstraction exists is to allow providing a readable [toString] in the list of completion handlers
627 * as seen from the debugger.
628 * Use [UserSupplied] to create an instance from a lambda.
629 * We can't avoid defining a separate type, because on JS, you can't inherit from a function type.
630 *
631 * @see InternalCompletionHandler for a very similar interface, but used for handling completion and not cancellation.
632 */
633 internal interface CancelHandler : NotCompleted {
634 /**
635 * Signals cancellation.
636 *
637 * This function:
638 * - Does not throw any exceptions.
639 * Violating this rule in an implementation leads to [handleUncaughtCoroutineException] being called with a
640 * [CompletionHandlerException] wrapping the thrown exception.
641 * - Is fast, non-blocking, and thread-safe.
642 * - Can be invoked concurrently with the surrounding code.
643 * - Can be invoked from any context.
644 *
645 * The meaning of `cause` that is passed to the handler is:
646 * - It is `null` if the continuation was cancelled directly via [CancellableContinuation.cancel] without a `cause`.
647 * - It is an instance of [CancellationException] if the continuation was _normally_ cancelled from the outside.
648 * **It should not be treated as an error**. In particular, it should not be reported to error logs.
649 * - Otherwise, the continuation had cancelled with an _error_.
650 */
651 fun invoke(cause: Throwable?)
652
653 /**
654 * A lambda passed from outside the coroutine machinery.
655 *
656 * See the requirements for [CancelHandler.invoke] when implementing this function.
657 */
658 class UserSupplied(private val handler: (cause: Throwable?) -> Unit) : CancelHandler {
659 /** @suppress */
660 override fun invoke(cause: Throwable?) { handler(cause) }
661
662 override fun toString() = "CancelHandler.UserSupplied[${handler.classSimpleName}@$hexAddress]"
663 }
664 }
665
666 // Completed with additional metadata
667 private data class CompletedContinuation(
668 @JvmField val result: Any?,
669 @JvmField val cancelHandler: CancelHandler? = null, // installed via invokeOnCancellation
670 @JvmField val onCancellation: ((cause: Throwable) -> Unit)? = null, // installed via resume block
671 @JvmField val idempotentResume: Any? = null,
672 @JvmField val cancelCause: Throwable? = null
673 ) {
674 val cancelled: Boolean get() = cancelCause != null
675
invokeHandlersnull676 fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) {
677 cancelHandler?.let { cont.callCancelHandler(it, cause) }
678 onCancellation?.let { cont.callOnCancellation(it, cause) }
679 }
680 }
681