<lambda>null1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.internal.*
4 import kotlin.coroutines.*
5 import kotlin.coroutines.intrinsics.*
6 
7 // --------------- cancellable continuations ---------------
8 
9 /**
10  * Cancellable continuation. It is _completed_ when resumed or cancelled.
11  * When the [cancel] function is explicitly invoked, this continuation immediately resumes with a [CancellationException] or
12  * the specified cancel cause.
13  *
14  * An instance of `CancellableContinuation` is created by the [suspendCancellableCoroutine] function.
15  *
16  * Cancellable continuation has three states (as subset of [Job] states):
17  *
18  * | **State**                           | [isActive] | [isCompleted] | [isCancelled] |
19  * | ----------------------------------- | ---------- | ------------- | ------------- |
20  * | _Active_ (initial state)            | `true`     | `false`       | `false`       |
21  * | _Resumed_ (final _completed_ state) | `false`    | `true`        | `false`       |
22  * | _Canceled_ (final _completed_ state)| `false`    | `true`        | `true`        |
23  *
24  * Invocation of [cancel] transitions this continuation from _active_ to _cancelled_ state, while
25  * invocation of [Continuation.resume] or [Continuation.resumeWithException] transitions it from _active_ to _resumed_ state.
26  *
27  * A [cancelled][isCancelled] continuation implies that it is [completed][isCompleted].
28  *
29  * Invocation of [Continuation.resume] or [Continuation.resumeWithException] in _resumed_ state produces an [IllegalStateException],
30  * but is ignored in _cancelled_ state.
31  *
32  * ```
33  *    +-----------+   resume    +---------+
34  *    |  Active   | ----------> | Resumed |
35  *    +-----------+             +---------+
36  *          |
37  *          | cancel
38  *          V
39  *    +-----------+
40  *    | Cancelled |
41  *    +-----------+
42  * ```
43  */
44 public interface CancellableContinuation<in T> : Continuation<T> {
45     /**
46      * Returns `true` when this continuation is active -- it has not completed or cancelled yet.
47      */
48     public val isActive: Boolean
49 
50     /**
51      * Returns `true` when this continuation has completed for any reason. A cancelled continuation
52      * is also considered complete.
53      */
54     public val isCompleted: Boolean
55 
56     /**
57      * Returns `true` if this continuation was [cancelled][cancel].
58      *
59      * It implies that [isActive] is `false` and [isCompleted] is `true`.
60      */
61     public val isCancelled: Boolean
62 
63     /**
64      * Tries to resume this continuation with the specified [value] and returns a non-null object token if successful,
65      * or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
66      * [completeResume] must be invoked with it.
67      *
68      * When [idempotent] is not `null`, this function performs an _idempotent_ operation, so that
69      * further invocations with the same non-null reference produce the same result.
70      *
71      * @suppress **This is unstable API and it is subject to change.**
72      */
73     @InternalCoroutinesApi
74     public fun tryResume(value: T, idempotent: Any? = null): Any?
75 
76     /**
77      * Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
78      * delivered to the caller because of the dispatch in the process, so that atomicity delivery
79      * guaranteed can be provided by having a cancellation fallback.
80      *
81      * Implementation note: current implementation always returns RESUME_TOKEN or `null`
82      *
83      * @suppress  **This is unstable API and it is subject to change.**
84      */
85     @InternalCoroutinesApi
86     public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?
87 
88     /**
89      * Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
90      * or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
91      * [completeResume] must be invoked with it.
92      *
93      * @suppress **This is unstable API and it is subject to change.**
94      */
95     @InternalCoroutinesApi
96     public fun tryResumeWithException(exception: Throwable): Any?
97 
98     /**
99      * Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
100      *
101      * @suppress **This is unstable API and it is subject to change.**
102      */
103     @InternalCoroutinesApi
104     public fun completeResume(token: Any)
105 
106     /**
107      * Internal function that setups cancellation behavior in [suspendCancellableCoroutine].
108      * It's illegal to call this function in any non-`kotlinx.coroutines` code and
109      * such calls lead to undefined behaviour.
110      * Exposed in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body.
111      *
112      * @suppress **This is unstable API and it is subject to change.**
113      */
114     @InternalCoroutinesApi
115     public fun initCancellability()
116 
117     /**
118      * Cancels this continuation with an optional cancellation `cause`. The result is `true` if this continuation was
119      * cancelled as a result of this invocation, and `false` otherwise.
120      */
121     public fun cancel(cause: Throwable? = null): Boolean
122 
123     /**
124      * Registers a [handler] to be **synchronously** invoked on [cancellation][cancel] (regular or exceptional) of this continuation.
125      * When the continuation is already cancelled, the handler is immediately invoked with the cancellation exception.
126      * Otherwise, the handler will be invoked as soon as this continuation is cancelled.
127      *
128      * The installed [handler] should not throw any exceptions.
129      * If it does, they will get caught, wrapped into a [CompletionHandlerException] and
130      * processed as an uncaught exception in the context of the current coroutine
131      * (see [CoroutineExceptionHandler]).
132      *
133      * At most one [handler] can be installed on a continuation.
134      * Attempting to call `invokeOnCancellation` a second time produces an [IllegalStateException].
135      *
136      * This handler is also called when this continuation [resumes][Continuation.resume] normally (with a value) and then
137      * is cancelled while waiting to be dispatched. More generally speaking, this handler is called whenever
138      * the caller of [suspendCancellableCoroutine] is getting a [CancellationException].
139      *
140      * A typical example of `invokeOnCancellation` usage is given in
141      * the documentation for the [suspendCancellableCoroutine] function.
142      *
143      * **Note**: Implementations of [CompletionHandler] must be fast, non-blocking, and thread-safe.
144      * This [handler] can be invoked concurrently with the surrounding code.
145      * There is no guarantee on the execution context in which the [handler] will be invoked.
146      */
147     public fun invokeOnCancellation(handler: CompletionHandler)
148 
149     /**
150      * Resumes this continuation with the specified [value] in the invoker thread without going through
151      * the [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
152      * This function is designed to only be used by [CoroutineDispatcher] implementations.
153      * **It should not be used in general code**.
154      *
155      * **Note: This function is experimental.** Its signature general code may be changed in the future.
156      */
157     @ExperimentalCoroutinesApi
158     public fun CoroutineDispatcher.resumeUndispatched(value: T)
159 
160     /**
161      * Resumes this continuation with the specified [exception] in the invoker thread without going through
162      * the [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
163      * This function is designed to only be used by [CoroutineDispatcher] implementations.
164      * **It should not be used in general code**.
165      *
166      * **Note: This function is experimental.** Its signature general code may be changed in the future.
167      */
168     @ExperimentalCoroutinesApi
169     public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
170 
171     /**
172      * Resumes this continuation with the specified `value` and calls the specified `onCancellation`
173      * handler when either resumed too late (when continuation was already cancelled) or, although resumed
174      * successfully (before cancellation), the coroutine's job was cancelled before it had a
175      * chance to run in its dispatcher, so that the suspended function threw an exception
176      * instead of returning this value.
177      *
178      * The installed [onCancellation] handler should not throw any exceptions.
179      * If it does, they will get caught, wrapped into a [CompletionHandlerException] and
180      * processed as an uncaught exception in the context of the current coroutine
181      * (see [CoroutineExceptionHandler]).
182      *
183      * This function shall be used when resuming with a resource that must be closed by
184      * code that called the corresponding suspending function, for example:
185      *
186      * ```
187      * continuation.resume(resource) {
188      *     resource.close()
189      * }
190      * ```
191      *
192      * A more complete example and further details are given in
193      * the documentation for the [suspendCancellableCoroutine] function.
194      *
195      * **Note**: The [onCancellation] handler must be fast, non-blocking, and thread-safe.
196      * It can be invoked concurrently with the surrounding code.
197      * There is no guarantee on the execution context of its invocation.
198      */
199     @ExperimentalCoroutinesApi // since 1.2.0
200     public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
201 }
202 
203 /**
204  * A version of `invokeOnCancellation` that accepts a class as a handler instead of a lambda, but identical otherwise.
205  * This allows providing a custom [toString] instance that will look better during debugging.
206  */
invokeOnCancellationnull207 internal fun <T> CancellableContinuation<T>.invokeOnCancellation(handler: CancelHandler) = when (this) {
208     is CancellableContinuationImpl -> invokeOnCancellationInternal(handler)
209     else -> throw UnsupportedOperationException("third-party implementation of CancellableContinuation is not supported")
210 }
211 
212 /**
213  * Suspends the coroutine like [suspendCoroutine], but providing a [CancellableContinuation] to
214  * the [block]. This function throws a [CancellationException] if the [Job] of the coroutine is
215  * cancelled or completed while it is suspended.
216  *
217  * A typical use of this function is to suspend a coroutine while waiting for a result
218  * from a single-shot callback API and to return the result to the caller.
219  * For multi-shot callback APIs see [callbackFlow][kotlinx.coroutines.flow.callbackFlow].
220  *
221  * ```
222  * suspend fun awaitCallback(): T = suspendCancellableCoroutine { continuation ->
223  *     val callback = object : Callback { // Implementation of some callback interface
224  *         override fun onCompleted(value: T) {
225  *             // Resume coroutine with a value provided by the callback
226  *             continuation.resume(value)
227  *         }
228  *         override fun onApiError(cause: Throwable) {
229  *             // Resume coroutine with an exception provided by the callback
230  *             continuation.resumeWithException(cause)
231  *         }
232  *     }
233  *     // Register callback with an API
234  *     api.register(callback)
235  *     // Remove callback on cancellation
236  *     continuation.invokeOnCancellation { api.unregister(callback) }
237  *     // At this point the coroutine is suspended by suspendCancellableCoroutine until callback fires
238  * }
239  * ```
240  *
241  * > The callback `register`/`unregister` methods provided by an external API must be thread-safe, because
242  * > `invokeOnCancellation` block can be called at any time due to asynchronous nature of cancellation, even
243  * > concurrently with the call of the callback.
244  *
245  * ### Prompt cancellation guarantee
246  *
247  * This function provides **prompt cancellation guarantee**.
248  * If the [Job] of the current coroutine was cancelled while this function was suspended it will not resume
249  * successfully, even if [CancellableContinuation.resume] was already invoked.
250  *
251  * The cancellation of the coroutine's job is generally asynchronous with respect to the suspended coroutine.
252  * The suspended coroutine is resumed with a call to its [Continuation.resumeWith] member function or to the
253  * [resume][Continuation.resume] extension function.
254  * However, when coroutine is resumed, it does not immediately start executing, but is passed to its
255  * [CoroutineDispatcher] to schedule its execution when dispatcher's resources become available for execution.
256  * The job's cancellation can happen before, after, and concurrently with the call to `resume`. In any
257  * case, prompt cancellation guarantees that the coroutine will not resume its code successfully.
258  *
259  * If the coroutine was resumed with an exception (for example, using [Continuation.resumeWithException] extension
260  * function) and cancelled, then the exception thrown by the `suspendCancellableCoroutine` function is determined
261  * by what happened first: exceptional resume or cancellation.
262  *
263  * ### Returning resources from a suspended coroutine
264  *
265  * As a result of the prompt cancellation guarantee, when a closeable resource
266  * (like open file or a handle to another native resource) is returned from a suspended coroutine as a value,
267  * it can be lost when the coroutine is cancelled. To ensure that the resource can be properly closed
268  * in this case, the [CancellableContinuation] interface provides two functions.
269  *
270  * - [invokeOnCancellation][CancellableContinuation.invokeOnCancellation] installs a handler that is called
271  *   whenever a suspend coroutine is being cancelled. In addition to the example at the beginning, it can be
272  *   used to ensure that a resource that was opened before the call to
273  *   `suspendCancellableCoroutine` or in its body is closed in case of cancellation.
274  *
275  * ```
276  * suspendCancellableCoroutine { continuation ->
277  *    val resource = openResource() // Opens some resource
278  *    continuation.invokeOnCancellation {
279  *        resource.close() // Ensures the resource is closed on cancellation
280  *    }
281  *    // ...
282  * }
283  * ```
284  *
285  * - [resume(value) { ... }][CancellableContinuation.resume] method on a [CancellableContinuation] takes
286  *   an optional `onCancellation` block. It can be used when resuming with a resource that must be closed by
287  *   the code that called the corresponding suspending function.
288  *
289  * ```
290  * suspendCancellableCoroutine { continuation ->
291  *     val callback = object : Callback { // Implementation of some callback interface
292  *         // A callback provides a reference to some closeable resource
293  *         override fun onCompleted(resource: T) {
294  *             // Resume coroutine with a value provided by the callback and ensure the resource is closed in case
295  *             // when the coroutine is cancelled before the caller gets a reference to the resource.
296  *             continuation.resume(resource) {
297  *                 resource.close() // Close the resource on cancellation
298  *             }
299  *         }
300  *     // ...
301  * }
302  * ```
303  *
304  * ### Implementation details and custom continuation interceptors
305  *
306  * The prompt cancellation guarantee is the result of a coordinated implementation inside `suspendCancellableCoroutine`
307  * function and the [CoroutineDispatcher] class. The coroutine dispatcher checks for the status of the [Job] immediately
308  * before continuing its normal execution and aborts this normal execution, calling all the corresponding
309  * cancellation handlers, if the job was cancelled.
310  *
311  * If a custom implementation of [ContinuationInterceptor] is used in a coroutine's context that does not extend
312  * [CoroutineDispatcher] class, then there is no prompt cancellation guarantee. A custom continuation interceptor
313  * can resume execution of a previously suspended coroutine even if its job was already cancelled.
314  */
suspendCancellableCoroutinenull315 public suspend inline fun <T> suspendCancellableCoroutine(
316     crossinline block: (CancellableContinuation<T>) -> Unit
317 ): T =
318     suspendCoroutineUninterceptedOrReturn { uCont ->
319         val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
320         /*
321          * For non-atomic cancellation we setup parent-child relationship immediately
322          * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
323          * properly supports cancellation.
324          */
325         cancellable.initCancellability()
326         block(cancellable)
327         cancellable.getResult()
328     }
329 
330 /**
331  * Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
332  * [CancellableContinuationImpl] is reused.
333  */
suspendCancellableCoroutineReusablenull334 internal suspend inline fun <T> suspendCancellableCoroutineReusable(
335     crossinline block: (CancellableContinuationImpl<T>) -> Unit
336 ): T = suspendCoroutineUninterceptedOrReturn { uCont ->
337     val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
338     try {
339         block(cancellable)
340     } catch (e: Throwable) {
341         // Here we catch any unexpected exception from user-supplied block (e.g. invariant violation)
342         // and release claimed continuation in order to leave it in a reasonable state (see #3613)
343         cancellable.releaseClaimedReusableContinuation()
344         throw e
345     }
346     cancellable.getResult()
347 }
348 
getOrCreateCancellableContinuationnull349 internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
350     // If used outside our dispatcher
351     if (delegate !is DispatchedContinuation<T>) {
352         return CancellableContinuationImpl(delegate, MODE_CANCELLABLE)
353     }
354     /*
355      * Attempt to claim reusable instance.
356      *
357      * suspendCancellableCoroutineReusable { // <- claimed
358      *     // Any asynchronous cancellation is "postponed" while this block
359      *     // is being executed
360      * } // postponed cancellation is checked here.
361      *
362      * Claim can fail for the following reasons:
363      * 1) Someone tried to make idempotent resume.
364      *    Idempotent resume is internal (used only by us) and is used only in `select`,
365      *    thus leaking CC instance for indefinite time.
366      * 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
367      */
368     return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetStateReusable() }
369         ?: return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
370 }
371 
372 /**
373  * Disposes the specified [handle] when this continuation is cancelled.
374  *
375  * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created):
376  * ```
377  * invokeOnCancellation { handle.dispose() }
378  * ```
379  *
380  * @suppress **This an internal API and should not be used from general code.**
381  */
382 @InternalCoroutinesApi
disposeOnCancellationnull383 public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle): Unit =
384     invokeOnCancellation(handler = DisposeOnCancel(handle))
385 
386 private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHandler {
387     override fun invoke(cause: Throwable?) = handle.dispose()
388     override fun toString(): String = "DisposeOnCancel[$handle]"
389 }
390