1 package kotlinx.coroutines 2 3 import kotlinx.coroutines.flow.* 4 import kotlinx.coroutines.internal.* 5 import java.io.* 6 import java.util.concurrent.* 7 import kotlin.coroutines.* 8 9 /** 10 * [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks. 11 * Instances of [ExecutorCoroutineDispatcher] should be closed by the owner of the dispatcher. 12 * 13 * This class is generally used as a bridge between coroutine-based API and 14 * asynchronous API that requires an instance of the [Executor]. 15 */ 16 public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable { 17 /** @suppress */ 18 @ExperimentalStdlibApi 19 public companion object Key : AbstractCoroutineContextKey<CoroutineDispatcher, ExecutorCoroutineDispatcher>( 20 CoroutineDispatcher, <lambda>null21 { it as? ExecutorCoroutineDispatcher }) 22 23 /** 24 * Underlying executor of current [CoroutineDispatcher]. 25 */ 26 public abstract val executor: Executor 27 28 /** 29 * Closes this coroutine dispatcher and shuts down its executor. 30 * 31 * It may throw an exception if this dispatcher is global and cannot be closed. 32 */ closenull33 public abstract override fun close() 34 } 35 36 @ExperimentalCoroutinesApi 37 public actual typealias CloseableCoroutineDispatcher = ExecutorCoroutineDispatcher 38 39 /** 40 * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. 41 * 42 * ## Interaction with [delay] and time-based coroutines. 43 * 44 * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related 45 * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled 46 * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding 47 * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. 48 * 49 * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, 50 * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order 51 * to reduce the memory pressure of cancelled coroutines. 52 * 53 * If the executor service is neither of this types, the separate internal thread will be used to 54 * _track_ the delay and time-related executions, but the coroutine itself will still be executed 55 * on top of the given executor. 56 * 57 * ## Rejected execution 58 * If the underlying executor throws [RejectedExecutionException] on 59 * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the 60 * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), 61 * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the 62 * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. 63 */ 64 @JvmName("from") // this is for a nice Java API, see issue #255 65 public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher = 66 ExecutorCoroutineDispatcherImpl(this) 67 68 /** 69 * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. 70 * 71 * ## Interaction with [delay] and time-based coroutines. 72 * 73 * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related 74 * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled 75 * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding 76 * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. 77 * 78 * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, 79 * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order 80 * to reduce the memory pressure of cancelled coroutines. 81 * 82 * If the executor is neither of this types, the separate internal thread will be used to 83 * _track_ the delay and time-related executions, but the coroutine itself will still be executed 84 * on top of the given executor. 85 * 86 * ## Rejected execution 87 * 88 * If the underlying executor throws [RejectedExecutionException] on 89 * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the 90 * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), 91 * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the 92 * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. 93 */ 94 @JvmName("from") // this is for a nice Java API, see issue #255 95 public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher = 96 (this as? DispatcherExecutor)?.dispatcher ?: ExecutorCoroutineDispatcherImpl(this) 97 98 /** 99 * Converts an instance of [CoroutineDispatcher] to an implementation of [Executor]. 100 * 101 * It returns the original executor when used on the result of [Executor.asCoroutineDispatcher] extensions. 102 */ 103 public fun CoroutineDispatcher.asExecutor(): Executor = 104 (this as? ExecutorCoroutineDispatcher)?.executor ?: DispatcherExecutor(this) 105 106 private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor { 107 override fun execute(block: Runnable) { 108 if (dispatcher.isDispatchNeeded(EmptyCoroutineContext)) { 109 dispatcher.dispatch(EmptyCoroutineContext, block) 110 } else { 111 block.run() 112 } 113 } 114 115 override fun toString(): String = dispatcher.toString() 116 } 117 118 internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay { 119 120 /* 121 * Attempts to reflectively (to be Java 6 compatible) invoke 122 * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup 123 * internal scheduler queue on cancellation. 124 */ 125 init { 126 removeFutureOnCancel(executor) 127 } 128 dispatchnull129 override fun dispatch(context: CoroutineContext, block: Runnable) { 130 try { 131 executor.execute(wrapTask(block)) 132 } catch (e: RejectedExecutionException) { 133 unTrackTask() 134 cancelJobOnRejection(context, e) 135 Dispatchers.IO.dispatch(context, block) 136 } 137 } 138 scheduleResumeAfterDelaynull139 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { 140 val future = (executor as? ScheduledExecutorService)?.scheduleBlock( 141 ResumeUndispatchedRunnable(this, continuation), 142 continuation.context, 143 timeMillis 144 ) 145 // If everything went fine and the scheduling attempt was not rejected -- use it 146 if (future != null) { 147 continuation.cancelFutureOnCancellation(future) 148 return 149 } 150 // Otherwise fallback to default executor 151 DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation) 152 } 153 invokeOnTimeoutnull154 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { 155 val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) 156 return when { 157 future != null -> DisposableFutureHandle(future) 158 else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) 159 } 160 } 161 ScheduledExecutorServicenull162 private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { 163 return try { 164 schedule(block, timeMillis, TimeUnit.MILLISECONDS) 165 } catch (e: RejectedExecutionException) { 166 cancelJobOnRejection(context, e) 167 null 168 } 169 } 170 cancelJobOnRejectionnull171 private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) { 172 context.cancel(CancellationException("The task was rejected", exception)) 173 } 174 closenull175 override fun close() { 176 (executor as? ExecutorService)?.shutdown() 177 } 178 toStringnull179 override fun toString(): String = executor.toString() 180 override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor 181 override fun hashCode(): Int = System.identityHashCode(executor) 182 } 183 184 private class ResumeUndispatchedRunnable( 185 private val dispatcher: CoroutineDispatcher, 186 private val continuation: CancellableContinuation<Unit> 187 ) : Runnable { 188 override fun run() { 189 with(continuation) { dispatcher.resumeUndispatched(Unit) } 190 } 191 } 192 193 /** 194 * An implementation of [DisposableHandle] that cancels the specified future on dispose. 195 * @suppress **This is unstable API and it is subject to change.** 196 */ 197 private class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle { disposenull198 override fun dispose() { 199 future.cancel(false) 200 } toStringnull201 override fun toString(): String = "DisposableFutureHandle[$future]" 202 } 203