xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/jvm/src/Executors.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)
1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.flow.*
4 import kotlinx.coroutines.internal.*
5 import java.io.*
6 import java.util.concurrent.*
7 import kotlin.coroutines.*
8 
9 /**
10  * [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks.
11  * Instances of [ExecutorCoroutineDispatcher] should be closed by the owner of the dispatcher.
12  *
13  * This class is generally used as a bridge between coroutine-based API and
14  * asynchronous API that requires an instance of the [Executor].
15  */
16 public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
17     /** @suppress */
18     @ExperimentalStdlibApi
19     public companion object Key : AbstractCoroutineContextKey<CoroutineDispatcher, ExecutorCoroutineDispatcher>(
20         CoroutineDispatcher,
<lambda>null21         { it as? ExecutorCoroutineDispatcher })
22 
23     /**
24      * Underlying executor of current [CoroutineDispatcher].
25      */
26     public abstract val executor: Executor
27 
28     /**
29      * Closes this coroutine dispatcher and shuts down its executor.
30      *
31      * It may throw an exception if this dispatcher is global and cannot be closed.
32      */
closenull33     public abstract override fun close()
34 }
35 
36 @ExperimentalCoroutinesApi
37 public actual typealias CloseableCoroutineDispatcher = ExecutorCoroutineDispatcher
38 
39 /**
40  * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
41  *
42  * ## Interaction with [delay] and time-based coroutines.
43  *
44  * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related
45  * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
46  * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
47  * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
48  *
49  * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
50  * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
51  * to reduce the memory pressure of cancelled coroutines.
52  *
53  * If the executor service is neither of this types, the separate internal thread will be used to
54  * _track_ the delay and time-related executions, but the coroutine itself will still be executed
55  * on top of the given executor.
56  *
57  * ## Rejected execution
58  * If the underlying executor throws [RejectedExecutionException] on
59  * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
60  * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
61  * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
62  * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
63  */
64 @JvmName("from") // this is for a nice Java API, see issue #255
65 public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher =
66     ExecutorCoroutineDispatcherImpl(this)
67 
68 /**
69  * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
70  *
71  * ## Interaction with [delay] and time-based coroutines.
72  *
73  * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related
74  * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
75  * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
76  * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
77  *
78  * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
79  * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
80  * to reduce the memory pressure of cancelled coroutines.
81  *
82  * If the executor is neither of this types, the separate internal thread will be used to
83  * _track_ the delay and time-related executions, but the coroutine itself will still be executed
84  * on top of the given executor.
85  *
86  * ## Rejected execution
87  *
88  * If the underlying executor throws [RejectedExecutionException] on
89  * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
90  * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
91  * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
92  * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
93  */
94 @JvmName("from") // this is for a nice Java API, see issue #255
95 public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
96     (this as? DispatcherExecutor)?.dispatcher ?: ExecutorCoroutineDispatcherImpl(this)
97 
98 /**
99  * Converts an instance of [CoroutineDispatcher] to an implementation of [Executor].
100  *
101  * It returns the original executor when used on the result of [Executor.asCoroutineDispatcher] extensions.
102  */
103 public fun CoroutineDispatcher.asExecutor(): Executor =
104     (this as? ExecutorCoroutineDispatcher)?.executor ?: DispatcherExecutor(this)
105 
106 private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor {
107     override fun execute(block: Runnable) {
108         if (dispatcher.isDispatchNeeded(EmptyCoroutineContext)) {
109             dispatcher.dispatch(EmptyCoroutineContext, block)
110         } else {
111             block.run()
112         }
113     }
114 
115     override fun toString(): String = dispatcher.toString()
116 }
117 
118 internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {
119 
120     /*
121      * Attempts to reflectively (to be Java 6 compatible) invoke
122      * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup
123      * internal scheduler queue on cancellation.
124      */
125     init {
126         removeFutureOnCancel(executor)
127     }
128 
dispatchnull129     override fun dispatch(context: CoroutineContext, block: Runnable) {
130         try {
131             executor.execute(wrapTask(block))
132         } catch (e: RejectedExecutionException) {
133             unTrackTask()
134             cancelJobOnRejection(context, e)
135             Dispatchers.IO.dispatch(context, block)
136         }
137     }
138 
scheduleResumeAfterDelaynull139     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
140         val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
141             ResumeUndispatchedRunnable(this, continuation),
142             continuation.context,
143             timeMillis
144         )
145         // If everything went fine and the scheduling attempt was not rejected -- use it
146         if (future != null) {
147             continuation.cancelFutureOnCancellation(future)
148             return
149         }
150         // Otherwise fallback to default executor
151         DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
152     }
153 
invokeOnTimeoutnull154     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
155         val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
156         return when {
157             future != null -> DisposableFutureHandle(future)
158             else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
159         }
160     }
161 
ScheduledExecutorServicenull162     private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
163         return try {
164             schedule(block, timeMillis, TimeUnit.MILLISECONDS)
165         } catch (e: RejectedExecutionException) {
166             cancelJobOnRejection(context, e)
167             null
168         }
169     }
170 
cancelJobOnRejectionnull171     private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) {
172         context.cancel(CancellationException("The task was rejected", exception))
173     }
174 
closenull175     override fun close() {
176         (executor as? ExecutorService)?.shutdown()
177     }
178 
toStringnull179     override fun toString(): String = executor.toString()
180     override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor
181     override fun hashCode(): Int = System.identityHashCode(executor)
182 }
183 
184 private class ResumeUndispatchedRunnable(
185     private val dispatcher: CoroutineDispatcher,
186     private val continuation: CancellableContinuation<Unit>
187 ) : Runnable {
188     override fun run() {
189         with(continuation) { dispatcher.resumeUndispatched(Unit) }
190     }
191 }
192 
193 /**
194  * An implementation of [DisposableHandle] that cancels the specified future on dispose.
195  * @suppress **This is unstable API and it is subject to change.**
196  */
197 private class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
disposenull198     override fun dispose() {
199         future.cancel(false)
200     }
toStringnull201     override fun toString(): String = "DisposableFutureHandle[$future]"
202 }
203