1 package kotlinx.coroutines
2
3 import kotlinx.coroutines.internal.*
4 import kotlin.coroutines.*
5
6 public expect abstract class W3CWindow
w3cSetTimeoutnull7 internal expect fun w3cSetTimeout(window: W3CWindow, handler: () -> Unit, timeout: Int): Int
8 internal expect fun w3cSetTimeout(handler: () -> Unit, timeout: Int): Int
9 internal expect fun w3cClearTimeout(handle: Int)
10 internal expect fun w3cClearTimeout(window: W3CWindow, handle: Int)
11
12 internal expect class ScheduledMessageQueue(dispatcher: SetTimeoutBasedDispatcher) : MessageQueue {
13 override fun schedule()
14 override fun reschedule()
15 internal fun setTimeout(timeout: Int)
16 }
17
18 internal expect class WindowMessageQueue(window: W3CWindow) : MessageQueue {
schedulenull19 override fun schedule()
20 override fun reschedule()
21 }
22
23 private const val MAX_DELAY = Int.MAX_VALUE.toLong()
24
25 private fun delayToInt(timeMillis: Long): Int =
26 timeMillis.coerceIn(0, MAX_DELAY).toInt()
27
28 internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
29 internal val messageQueue = ScheduledMessageQueue(this)
30
31 abstract fun scheduleQueueProcessing()
32
33 override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
34 parallelism.checkParallelism()
35 return this
36 }
37
38 override fun dispatch(context: CoroutineContext, block: Runnable) {
39 messageQueue.enqueue(block)
40 }
41
42 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
43 val handle = w3cSetTimeout({ block.run() }, delayToInt(timeMillis))
44 return ClearTimeout(handle)
45 }
46
47 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
48 val handle = w3cSetTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
49 continuation.invokeOnCancellation(handler = ClearTimeout(handle))
50 }
51 }
52
53 internal class WindowDispatcher(private val window: W3CWindow) : CoroutineDispatcher(), Delay {
54 private val queue = WindowMessageQueue(window)
55
dispatchnull56 override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block)
57
58 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
59 val handle = w3cSetTimeout(window, { with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
60 continuation.invokeOnCancellation(handler = WindowClearTimeout(handle))
61 }
62
invokeOnTimeoutnull63 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
64 val handle = w3cSetTimeout(window, block::run, delayToInt(timeMillis))
65 return WindowClearTimeout(handle)
66 }
67
68 private inner class WindowClearTimeout(handle: Int) : ClearTimeout(handle) {
disposenull69 override fun dispose() {
70 w3cClearTimeout(window, handle)
71 }
72 }
73 }
74
75 internal object SetTimeoutDispatcher : SetTimeoutBasedDispatcher() {
scheduleQueueProcessingnull76 override fun scheduleQueueProcessing() {
77 messageQueue.setTimeout(0)
78 }
79 }
80
81 private open class ClearTimeout(protected val handle: Int) : CancelHandler, DisposableHandle {
disposenull82 override fun dispose() {
83 w3cClearTimeout(handle)
84 }
85
invokenull86 override fun invoke(cause: Throwable?) {
87 dispose()
88 }
89
toStringnull90 override fun toString(): String = "ClearTimeout[$handle]"
91 }
92
93
94 /**
95 * An abstraction over JS scheduling mechanism that leverages micro-batching of dispatched blocks without
96 * paying the cost of JS callbacks scheduling on every dispatch.
97 *
98 * Queue uses two scheduling mechanisms:
99 * 1) [schedule] is used to schedule the initial processing of the message queue.
100 * JS engine-specific microtask mechanism is used in order to boost performance on short runs and a dispatch batch
101 * 2) [reschedule] is used to schedule processing of the queue after yield to the JS event loop.
102 * JS engine-specific macrotask mechanism is used not to starve animations and non-coroutines macrotasks.
103 *
104 * Yet there could be a long tail of "slow" reschedules, but it should be amortized by the queue size.
105 */
106 internal abstract class MessageQueue : MutableList<Runnable> by ArrayDeque() {
107 val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages
108 private var scheduled = false
109
110 abstract fun schedule()
111
112 abstract fun reschedule()
113
114 fun enqueue(element: Runnable) {
115 add(element)
116 if (!scheduled) {
117 scheduled = true
118 schedule()
119 }
120 }
121
122 fun process() {
123 try {
124 // limit number of processed messages
125 repeat(yieldEvery) {
126 val element = removeFirstOrNull() ?: return@process
127 element.run()
128 }
129 } finally {
130 if (isEmpty()) {
131 scheduled = false
132 } else {
133 reschedule()
134 }
135 }
136 }
137 }
138