1 package kotlinx.coroutines
2
3 import kotlinx.coroutines.internal.*
4 import kotlin.coroutines.*
5 import kotlin.jvm.*
6
7 /**
8 * Non-cancellable dispatch mode.
9 *
10 * **DO NOT CHANGE THE CONSTANT VALUE**. It might be inlined into legacy user code that was calling
11 * inline `suspendAtomicCancellableCoroutine` function and did not support reuse.
12 */
13 internal const val MODE_ATOMIC = 0
14
15 /**
16 * Cancellable dispatch mode. It is used by user-facing [suspendCancellableCoroutine].
17 * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension.
18 *
19 * **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine].
20 */
21 @PublishedApi
22 internal const val MODE_CANCELLABLE: Int = 1
23
24 /**
25 * Cancellable dispatch mode for [suspendCancellableCoroutineReusable].
26 * Note, that implementation of cancellability checks mode via [Int.isCancellableMode] extension;
27 * implementation of reuse checks mode via [Int.isReusableMode] extension.
28 */
29 internal const val MODE_CANCELLABLE_REUSABLE = 2
30
31 /**
32 * Undispatched mode for [CancellableContinuation.resumeUndispatched].
33 * It is used when the thread is right, but it needs to be marked with the current coroutine.
34 */
35 internal const val MODE_UNDISPATCHED = 4
36
37 /**
38 * Initial mode for [DispatchedContinuation] implementation, should never be used for dispatch, because it is always
39 * overwritten when continuation is resumed with the actual resume mode.
40 */
41 internal const val MODE_UNINITIALIZED = -1
42
43 internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
44 internal val Int.isReusableMode get() = this == MODE_CANCELLABLE_REUSABLE
45
46 @PublishedApi
47 internal abstract class DispatchedTask<in T> internal constructor(
48 // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
49 @JvmField public var resumeMode: Int
50 ) : SchedulerTask() {
51 internal abstract val delegate: Continuation<T>
52
takeStatenull53 internal abstract fun takeState(): Any?
54
55 /**
56 * Called when this task was cancelled while it was being dispatched.
57 */
58 internal open fun cancelCompletedResult(takenState: Any?, cause: Throwable) {}
59
60 /**
61 * There are two implementations of `DispatchedTask`:
62 * - [DispatchedContinuation] keeps only simple values as successfully results.
63 * - [CancellableContinuationImpl] keeps additional data with values and overrides this method to unwrap it.
64 */
65 @Suppress("UNCHECKED_CAST")
getSuccessfulResultnull66 internal open fun <T> getSuccessfulResult(state: Any?): T =
67 state as T
68
69 /**
70 * There are two implementations of `DispatchedTask`:
71 * - [DispatchedContinuation] is just an intermediate storage that stores the exception that has its stack-trace
72 * properly recovered and is ready to pass to the [delegate] continuation directly.
73 * - [CancellableContinuationImpl] stores raw cause of the failure in its state; when it needs to be dispatched
74 * its stack-trace has to be recovered, so it overrides this method for that purpose.
75 */
76 internal open fun getExceptionalResult(state: Any?): Throwable? =
77 (state as? CompletedExceptionally)?.cause
78
79 final override fun run() {
80 assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
81 val taskContext = this.taskContext
82 var fatalException: Throwable? = null
83 try {
84 val delegate = delegate as DispatchedContinuation<T>
85 val continuation = delegate.continuation
86 withContinuationContext(continuation, delegate.countOrElement) {
87 val context = continuation.context
88 val state = takeState() // NOTE: Must take state in any case, even if cancelled
89 val exception = getExceptionalResult(state)
90 /*
91 * Check whether continuation was originally resumed with an exception.
92 * If so, it dominates cancellation, otherwise the original exception
93 * will be silently lost.
94 */
95 val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
96 if (job != null && !job.isActive) {
97 val cause = job.getCancellationException()
98 cancelCompletedResult(state, cause)
99 continuation.resumeWithStackTrace(cause)
100 } else {
101 if (exception != null) {
102 continuation.resumeWithException(exception)
103 } else {
104 continuation.resume(getSuccessfulResult(state))
105 }
106 }
107 }
108 } catch (e: Throwable) {
109 // This instead of runCatching to have nicer stacktrace and debug experience
110 fatalException = e
111 } finally {
112 val result = runCatching { taskContext.afterTask() }
113 handleFatalException(fatalException, result.exceptionOrNull())
114 }
115 }
116
117 /**
118 * Machinery that handles fatal exceptions in kotlinx.coroutines.
119 * There are two kinds of fatal exceptions:
120 *
121 * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
122 * the library or the compiler has a bug that breaks internal invariants.
123 * They usually have specific workarounds, but require careful study of the cause and should
124 * be reported to the maintainers and fixed on the library's side anyway.
125 *
126 * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
127 * While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
128 * we can't ignore it because it may leave coroutine in the inconsistent state.
129 * If you encounter such exception, you can either disable this context element or wrap it into
130 * another context element that catches all exceptions and handles it in the application specific manner.
131 *
132 * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
133 * a failed coroutine, but such exceptions should be reported anyway.
134 */
handleFatalExceptionnull135 internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
136 if (exception === null && finallyException === null) return
137 if (exception !== null && finallyException !== null) {
138 exception.addSuppressed(finallyException)
139 }
140
141 val cause = exception ?: finallyException
142 val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
143 "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
144 handleCoroutineException(this.delegate.context, reason)
145 }
146 }
147
dispatchnull148 internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
149 assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method
150 val delegate = this.delegate
151 val undispatched = mode == MODE_UNDISPATCHED
152 if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
153 // dispatch directly using this instance's Runnable implementation
154 val dispatcher = delegate.dispatcher
155 val context = delegate.context
156 if (dispatcher.isDispatchNeeded(context)) {
157 dispatcher.dispatch(context, this)
158 } else {
159 resumeUnconfined()
160 }
161 } else {
162 // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation)
163 // or undispatched mode was requested
164 resume(delegate, undispatched)
165 }
166 }
167
resumenull168 internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, undispatched: Boolean) {
169 // This resume is never cancellable. The result is always delivered to delegate continuation.
170 val state = takeState()
171 val exception = getExceptionalResult(state)
172 val result = if (exception != null) Result.failure(exception) else Result.success(getSuccessfulResult<T>(state))
173 when {
174 undispatched -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
175 else -> delegate.resumeWith(result)
176 }
177 }
178
resumeUnconfinednull179 private fun DispatchedTask<*>.resumeUnconfined() {
180 val eventLoop = ThreadLocalEventLoop.eventLoop
181 if (eventLoop.isUnconfinedLoopActive) {
182 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
183 eventLoop.dispatchUnconfined(this)
184 } else {
185 // Was not active -- run event loop until all unconfined tasks are executed
186 runUnconfinedEventLoop(eventLoop) {
187 resume(delegate, undispatched = true)
188 }
189 }
190 }
191
runUnconfinedEventLoopnull192 internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
193 eventLoop: EventLoop,
194 block: () -> Unit
195 ) {
196 eventLoop.incrementUseCount(unconfined = true)
197 try {
198 block()
199 while (true) {
200 // break when all unconfined continuations where executed
201 if (!eventLoop.processUnconfinedEvent()) break
202 }
203 } catch (e: Throwable) {
204 /*
205 * This exception doesn't happen normally, only if we have a bug in implementation.
206 * Report it as a fatal exception.
207 */
208 handleFatalException(e, null)
209 } finally {
210 eventLoop.decrementUseCount(unconfined = true)
211 }
212 }
213
214 @Suppress("NOTHING_TO_INLINE")
resumeWithStackTracenull215 internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
216 resumeWith(Result.failure(recoverStackTrace(exception, this)))
217 }
218