1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.internal.*
4 import kotlin.coroutines.*
5 
6 public expect abstract class W3CWindow
w3cSetTimeoutnull7 internal expect fun w3cSetTimeout(window: W3CWindow, handler: () -> Unit, timeout: Int): Int
8 internal expect fun w3cSetTimeout(handler: () -> Unit, timeout: Int): Int
9 internal expect fun w3cClearTimeout(handle: Int)
10 internal expect fun w3cClearTimeout(window: W3CWindow, handle: Int)
11 
12 internal expect class ScheduledMessageQueue(dispatcher: SetTimeoutBasedDispatcher) : MessageQueue {
13     override fun schedule()
14     override fun reschedule()
15     internal fun setTimeout(timeout: Int)
16 }
17 
18 internal expect class WindowMessageQueue(window: W3CWindow) : MessageQueue {
schedulenull19     override fun schedule()
20     override fun reschedule()
21 }
22 
23 private const val MAX_DELAY = Int.MAX_VALUE.toLong()
24 
25 private fun delayToInt(timeMillis: Long): Int =
26     timeMillis.coerceIn(0, MAX_DELAY).toInt()
27 
28 internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
29     internal val messageQueue = ScheduledMessageQueue(this)
30 
31     abstract fun scheduleQueueProcessing()
32 
33     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
34         parallelism.checkParallelism()
35         return this
36     }
37 
38     override fun dispatch(context: CoroutineContext, block: Runnable) {
39         messageQueue.enqueue(block)
40     }
41 
42     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
43         val handle = w3cSetTimeout({ block.run() }, delayToInt(timeMillis))
44         return ClearTimeout(handle)
45     }
46 
47     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
48         val handle = w3cSetTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
49         continuation.invokeOnCancellation(handler = ClearTimeout(handle))
50     }
51 }
52 
53 internal class WindowDispatcher(private val window: W3CWindow) : CoroutineDispatcher(), Delay {
54     private val queue = WindowMessageQueue(window)
55 
dispatchnull56     override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block)
57 
58     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
59         val handle = w3cSetTimeout(window, { with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
60         continuation.invokeOnCancellation(handler = WindowClearTimeout(handle))
61     }
62 
invokeOnTimeoutnull63     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
64         val handle = w3cSetTimeout(window, block::run, delayToInt(timeMillis))
65         return WindowClearTimeout(handle)
66     }
67 
68     private inner class WindowClearTimeout(handle: Int) : ClearTimeout(handle) {
disposenull69         override fun dispose() {
70             w3cClearTimeout(window, handle)
71         }
72     }
73 }
74 
75 internal object SetTimeoutDispatcher : SetTimeoutBasedDispatcher() {
scheduleQueueProcessingnull76     override fun scheduleQueueProcessing() {
77         messageQueue.setTimeout(0)
78     }
79 }
80 
81 private open class ClearTimeout(protected val handle: Int) : CancelHandler, DisposableHandle {
disposenull82     override fun dispose() {
83         w3cClearTimeout(handle)
84     }
85 
invokenull86     override fun invoke(cause: Throwable?) {
87         dispose()
88     }
89 
toStringnull90     override fun toString(): String = "ClearTimeout[$handle]"
91 }
92 
93 
94 /**
95  * An abstraction over JS scheduling mechanism that leverages micro-batching of dispatched blocks without
96  * paying the cost of JS callbacks scheduling on every dispatch.
97  *
98  * Queue uses two scheduling mechanisms:
99  * 1) [schedule] is used to schedule the initial processing of the message queue.
100  *    JS engine-specific microtask mechanism is used in order to boost performance on short runs and a dispatch batch
101  * 2) [reschedule] is used to schedule processing of the queue after yield to the JS event loop.
102  *    JS engine-specific macrotask mechanism is used not to starve animations and non-coroutines macrotasks.
103  *
104  * Yet there could be a long tail of "slow" reschedules, but it should be amortized by the queue size.
105  */
106 internal abstract class MessageQueue : MutableList<Runnable> by ArrayDeque() {
107     val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages
108     private var scheduled = false
109 
110     abstract fun schedule()
111 
112     abstract fun reschedule()
113 
114     fun enqueue(element: Runnable) {
115         add(element)
116         if (!scheduled) {
117             scheduled = true
118             schedule()
119         }
120     }
121 
122     fun process() {
123         try {
124             // limit number of processed messages
125             repeat(yieldEvery) {
126                 val element = removeFirstOrNull() ?: return@process
127                 element.run()
128             }
129         } finally {
130             if (isEmpty()) {
131                 scheduled = false
132             } else {
133                 reschedule()
134             }
135         }
136     }
137 }
138