1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.internal.*
4 import kotlin.coroutines.*
5 import kotlin.jvm.*
6 
7 /**
8  * Non-cancellable dispatch mode.
9  *
10  * **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling
11  * inline `suspendAtomicCancellableCoroutine` function and did not support reuse.
12  */
13 internal const val MODE_ATOMIC = 0
14 
15 /**
16  * Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine].
17  * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension.
18  *
19  * **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine].
20  */
21 @PublishedApi
22 internal const val MODE_CANCELLABLE: Int = 1
23 
24 /**
25  * Cancellable dispatch mode for [suspendCancellableCoroutineReusable].
26  * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension;
27  * implementation of reuse checks mode via [Int.isReusableMode] extension.
28  */
29 internal const val MODE_CANCELLABLE_REUSABLE = 2
30 
31 /**
32  * Undispatched mode for [CancellableContinuation.resumeUndispatched].
33  * It is used when the thread is right, but it needs to be marked with the current coroutine.
34  */
35 internal const val MODE_UNDISPATCHED = 4
36 
37 /**
38  * Initial mode for [DispatchedContinuation] implementation, should never be used for dispatch, because it is always
39  * overwritten when continuation is resumed with the actual resume mode.
40  */
41 internal const val MODE_UNINITIALIZED = -1
42 
43 internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
44 internal val Int.isReusableMode get() = this == MODE_CANCELLABLE_REUSABLE
45 
46 @PublishedApi
47 internal abstract class DispatchedTask<in T> internal constructor(
48     // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
49     @JvmField public var resumeMode: Int
50 ) : SchedulerTask() {
51     internal abstract val delegate: Continuation<T>
52 
takeStatenull53     internal abstract fun takeState(): Any?
54 
55     /**
56      * Called when this task was cancelled while it was being dispatched.
57      */
58     internal open fun cancelCompletedResult(takenState: Any?, cause: Throwable) {}
59 
60     /**
61      * There are two implementations of `DispatchedTask`:
62      * - [DispatchedContinuation] keeps only simple values as successfully results.
63      * - [CancellableContinuationImpl] keeps additional data with values and overrides this method to unwrap it.
64      */
65     @Suppress("UNCHECKED_CAST")
getSuccessfulResultnull66     internal open fun <T> getSuccessfulResult(state: Any?): T =
67         state as T
68 
69     /**
70      * There are two implementations of `DispatchedTask`:
71      * - [DispatchedContinuation] is just an intermediate storage that stores the exception that has its stack-trace
72      *   properly recovered and is ready to pass to the [delegate] continuation directly.
73      * - [CancellableContinuationImpl] stores raw cause of the failure in its state; when it needs to be dispatched
74      *   its stack-trace has to be recovered, so it overrides this method for that purpose.
75      */
76     internal open fun getExceptionalResult(state: Any?): Throwable? =
77         (state as? CompletedExceptionally)?.cause
78 
79     final override fun run() {
80         assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
81         val taskContext = this.taskContext
82         var fatalException: Throwable? = null
83         try {
84             val delegate = delegate as DispatchedContinuation<T>
85             val continuation = delegate.continuation
86             withContinuationContext(continuation, delegate.countOrElement) {
87                 val context = continuation.context
88                 val state = takeState() // NOTE: Must take state in any case, even if cancelled
89                 val exception = getExceptionalResult(state)
90                 /*
91                  * Check whether continuation was originally resumed with an exception.
92                  * If so, it dominates cancellation, otherwise the original exception
93                  * will be silently lost.
94                  */
95                 val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
96                 if (job != null && !job.isActive) {
97                     val cause = job.getCancellationException()
98                     cancelCompletedResult(state, cause)
99                     continuation.resumeWithStackTrace(cause)
100                 } else {
101                     if (exception != null) {
102                         continuation.resumeWithException(exception)
103                     } else {
104                         continuation.resume(getSuccessfulResult(state))
105                     }
106                 }
107             }
108         } catch (e: Throwable) {
109             // This instead of runCatching to have nicer stacktrace and debug experience
110             fatalException = e
111         } finally {
112             val result = runCatching { taskContext.afterTask() }
113             handleFatalException(fatalException, result.exceptionOrNull())
114         }
115     }
116 
117     /**
118      * Machinery that handles fatal exceptions in kotlinx.coroutines.
119      * There are two kinds of fatal exceptions:
120      *
121      * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
122      *    the library or the compiler has a bug that breaks internal invariants.
123      *    They usually have specific workarounds, but require careful study of the cause and should
124      *    be reported to the maintainers and fixed on the library's side anyway.
125      *
126      * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
127      *    While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
128      *    we can't ignore it because it may leave coroutine in the inconsistent state.
129      *    If you encounter such exception, you can either disable this context element or wrap it into
130      *    another context element that catches all exceptions and handles it in the application specific manner.
131      *
132      * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
133      * a failed coroutine, but such exceptions should be reported anyway.
134      */
handleFatalExceptionnull135     internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
136         if (exception === null && finallyException === null) return
137         if (exception !== null && finallyException !== null) {
138             exception.addSuppressed(finallyException)
139         }
140 
141         val cause = exception ?: finallyException
142         val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
143                 "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
144         handleCoroutineException(this.delegate.context, reason)
145     }
146 }
147 
dispatchnull148 internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
149     assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method
150     val delegate = this.delegate
151     val undispatched = mode == MODE_UNDISPATCHED
152     if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
153         // dispatch directly using this instance's Runnable implementation
154         val dispatcher = delegate.dispatcher
155         val context = delegate.context
156         if (dispatcher.isDispatchNeeded(context)) {
157             dispatcher.dispatch(context, this)
158         } else {
159             resumeUnconfined()
160         }
161     } else {
162         // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation)
163         // or undispatched mode was requested
164         resume(delegate, undispatched)
165     }
166 }
167 
resumenull168 internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, undispatched: Boolean) {
169     // This resume is never cancellable. The result is always delivered to delegate continuation.
170     val state = takeState()
171     val exception = getExceptionalResult(state)
172     val result = if (exception != null) Result.failure(exception) else Result.success(getSuccessfulResult<T>(state))
173     when {
174         undispatched -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
175         else -> delegate.resumeWith(result)
176     }
177 }
178 
resumeUnconfinednull179 private fun DispatchedTask<*>.resumeUnconfined() {
180     val eventLoop = ThreadLocalEventLoop.eventLoop
181     if (eventLoop.isUnconfinedLoopActive) {
182         // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
183         eventLoop.dispatchUnconfined(this)
184     } else {
185         // Was not active -- run event loop until all unconfined tasks are executed
186         runUnconfinedEventLoop(eventLoop) {
187             resume(delegate, undispatched = true)
188         }
189     }
190 }
191 
runUnconfinedEventLoopnull192 internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
193     eventLoop: EventLoop,
194     block: () -> Unit
195 ) {
196     eventLoop.incrementUseCount(unconfined = true)
197     try {
198         block()
199         while (true) {
200             // break when all unconfined continuations where executed
201             if (!eventLoop.processUnconfinedEvent()) break
202         }
203     } catch (e: Throwable) {
204         /*
205          * This exception doesn't happen normally, only if we have a bug in implementation.
206          * Report it as a fatal exception.
207          */
208         handleFatalException(e, null)
209     } finally {
210         eventLoop.decrementUseCount(unconfined = true)
211     }
212 }
213 
214 @Suppress("NOTHING_TO_INLINE")
resumeWithStackTracenull215 internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
216     resumeWith(Result.failure(recoverStackTrace(exception, this)))
217 }
218