xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/common/src/EventLoop.common.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)

<lambda>null1 package kotlinx.coroutines
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.internal.*
5 import kotlin.concurrent.Volatile
6 import kotlin.coroutines.*
7 import kotlin.jvm.*
8 
9 /**
10  * Extended by [CoroutineDispatcher] implementations that have event loop inside and can
11  * be asked to process next event from their event queue.
12  *
13  * It may optionally implement [Delay] interface and support time-scheduled tasks.
14  * It is created or pigged back onto (see [ThreadLocalEventLoop])
15  * by `runBlocking` and by [Dispatchers.Unconfined].
16  *
17  * @suppress **This an internal API and should not be used from general code.**
18  */
19 internal abstract class EventLoop : CoroutineDispatcher() {
20     /**
21      * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
22      */
23     private var useCount = 0L
24 
25     /**
26      * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
27      * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
28      * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
29      */
30     private var shared = false
31 
32     /**
33      * Queue used by [Dispatchers.Unconfined] tasks.
34      * These tasks are thread-local for performance and take precedence over the rest of the queue.
35      */
36     private var unconfinedQueue: ArrayDeque<DispatchedTask<*>>? = null
37 
38     /**
39      * Processes next event in this event loop.
40      *
41      * The result of this function is to be interpreted like this:
42      * - `<= 0` -- there are potentially more events for immediate processing;
43      * - `> 0` -- a number of nanoseconds to wait for next scheduled event;
44      * - [Long.MAX_VALUE] -- no more events.
45      *
46      * **NOTE**: Must be invoked only from the event loop's thread
47      *          (no check for performance reasons, may be added in the future).
48      */
49     open fun processNextEvent(): Long {
50         if (!processUnconfinedEvent()) return Long.MAX_VALUE
51         return 0
52     }
53 
54     protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
55 
56     protected open val nextTime: Long
57         get() {
58             val queue = unconfinedQueue ?: return Long.MAX_VALUE
59             return if (queue.isEmpty()) Long.MAX_VALUE else 0L
60         }
61 
62     fun processUnconfinedEvent(): Boolean {
63         val queue = unconfinedQueue ?: return false
64         val task = queue.removeFirstOrNull() ?: return false
65         task.run()
66         return true
67     }
68     /**
69      * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
70      * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
71      * By default, event loop implementation is thread-local and should not processed in the context
72      * (current thread's event loop should be processed instead).
73      */
74     open fun shouldBeProcessedFromContext(): Boolean = false
75 
76     /**
77      * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
78      * into the current event loop.
79      */
80     fun dispatchUnconfined(task: DispatchedTask<*>) {
81         val queue = unconfinedQueue ?:
82             ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
83         queue.addLast(task)
84     }
85 
86     val isActive: Boolean
87         get() = useCount > 0
88 
89     val isUnconfinedLoopActive: Boolean
90         get() = useCount >= delta(unconfined = true)
91 
92     // May only be used from the event loop's thread
93     val isUnconfinedQueueEmpty: Boolean
94         get() = unconfinedQueue?.isEmpty() ?: true
95 
96     private fun delta(unconfined: Boolean) =
97         if (unconfined) (1L shl 32) else 1L
98 
99     fun incrementUseCount(unconfined: Boolean = false) {
100         useCount += delta(unconfined)
101         if (!unconfined) shared = true
102     }
103 
104     fun decrementUseCount(unconfined: Boolean = false) {
105         useCount -= delta(unconfined)
106         if (useCount > 0) return
107         assert { useCount == 0L } // "Extra decrementUseCount"
108         if (shared) {
109             // shut it down and remove from ThreadLocalEventLoop
110             shutdown()
111         }
112     }
113 
114     final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
115         parallelism.checkParallelism()
116         return this
117     }
118 
119     open fun shutdown() {}
120 }
121 
122 internal object ThreadLocalEventLoop {
123     private val ref = commonThreadLocal<EventLoop?>(Symbol("ThreadLocalEventLoop"))
124 
125     internal val eventLoop: EventLoop
<lambda>null126         get() = ref.get() ?: createEventLoop().also { ref.set(it) }
127 
currentOrNullnull128     internal fun currentOrNull(): EventLoop? =
129         ref.get()
130 
131     internal fun resetEventLoop() {
132         ref.set(null)
133     }
134 
setEventLoopnull135     internal fun setEventLoop(eventLoop: EventLoop) {
136         ref.set(eventLoop)
137     }
138 }
139 
140 private val DISPOSED_TASK = Symbol("REMOVED_TASK")
141 
142 // results for scheduleImpl
143 private const val SCHEDULE_OK = 0
144 private const val SCHEDULE_COMPLETED = 1
145 private const val SCHEDULE_DISPOSED = 2
146 
147 private const val MS_TO_NS = 1_000_000L
148 private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
149 
150 /**
151  * First-line overflow protection -- limit maximal delay.
152  * Delays longer than this one (~146 years) are considered to be delayed "forever".
153  */
154 private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
155 
delayToNanosnull156 internal fun delayToNanos(timeMillis: Long): Long = when {
157     timeMillis <= 0 -> 0L
158     timeMillis >= MAX_MS -> Long.MAX_VALUE
159     else -> timeMillis * MS_TO_NS
160 }
161 
delayNanosToMillisnull162 internal fun delayNanosToMillis(timeNanos: Long): Long =
163     timeNanos / MS_TO_NS
164 
165 private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
166 
167 private typealias Queue<T> = LockFreeTaskQueueCore<T>
168 
169 internal expect abstract class EventLoopImplPlatform() : EventLoop {
170     // Called to unpark this event loop's thread
171     protected fun unpark()
172 
173     // Called to reschedule to DefaultExecutor when this event loop is complete
174     protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
175 }
176 
177 internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
178     // null | CLOSED_EMPTY | task | Queue<Runnable>
179     private val _queue = atomic<Any?>(null)
180 
181     // Allocated only only once
182     private val _delayed = atomic<DelayedTaskQueue?>(null)
183 
184     private val _isCompleted = atomic(false)
185     private var isCompleted
186         get() = _isCompleted.value
187         set(value) { _isCompleted.value = value }
188 
189     override val isEmpty: Boolean get() {
190         if (!isUnconfinedQueueEmpty) return false
191         val delayed = _delayed.value
192         if (delayed != null && !delayed.isEmpty) return false
193         return when (val queue = _queue.value) {
194             null -> true
195             is Queue<*> -> queue.isEmpty
196             else -> queue === CLOSED_EMPTY
197         }
198     }
199 
200     override val nextTime: Long
201         get() {
202             if (super.nextTime == 0L) return 0L
203             val queue = _queue.value
204             when {
205                 queue === null -> {} // empty queue -- proceed
206                 queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
207                 queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
208                 else -> return 0 // non-empty queue
209             }
210             val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
211             return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
212         }
213 
shutdownnull214     override fun shutdown() {
215         // Clean up thread-local reference here -- this event loop is shutting down
216         ThreadLocalEventLoop.resetEventLoop()
217         // We should signal that this event loop should not accept any more tasks
218         // and process queued events (that could have been added after last processNextEvent)
219         isCompleted = true
220         closeQueue()
221         // complete processing of all queued tasks
222         while (processNextEvent() <= 0) { /* spin */ }
223         // reschedule the rest of delayed tasks
224         rescheduleAllDelayed()
225     }
226 
scheduleResumeAfterDelaynull227     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
228         val timeNanos = delayToNanos(timeMillis)
229         if (timeNanos < MAX_DELAY_NS) {
230             val now = nanoTime()
231             DelayedResumeTask(now + timeNanos, continuation).also { task ->
232                 /*
233                  * Order is important here: first we schedule the heap and only then
234                  * publish it to continuation. Otherwise, `DelayedResumeTask` would
235                  * have to know how to be disposed of even when it wasn't scheduled yet.
236                  */
237                 schedule(now, task)
238                 continuation.disposeOnCancellation(task)
239             }
240         }
241     }
242 
scheduleInvokeOnTimeoutnull243     protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
244         val timeNanos = delayToNanos(timeMillis)
245         return if (timeNanos < MAX_DELAY_NS) {
246             val now = nanoTime()
247             DelayedRunnableTask(now + timeNanos, block).also { task ->
248                 schedule(now, task)
249             }
250         } else {
251             NonDisposableHandle
252         }
253     }
254 
processNextEventnull255     override fun processNextEvent(): Long {
256         // unconfined events take priority
257         if (processUnconfinedEvent()) return 0
258         // queue all delayed tasks that are due to be executed
259         val delayed = _delayed.value
260         if (delayed != null && !delayed.isEmpty) {
261             val now = nanoTime()
262             while (true) {
263                 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
264                 // to make sure that 'isEmpty' and `nextTime` that check both of them
265                 // do not transiently report that both delayed and queue are empty during move
266                 delayed.removeFirstIf {
267                     if (it.timeToExecute(now)) {
268                         enqueueImpl(it)
269                     } else
270                         false
271                 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
272             }
273         }
274         // then process one event from queue
275         val task = dequeue()
276         if (task != null) {
277             platformAutoreleasePool { task.run() }
278             return 0
279         }
280         return nextTime
281     }
282 
dispatchnull283     final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
284 
285     open fun enqueue(task: Runnable) {
286         if (enqueueImpl(task)) {
287             // todo: we should unpark only when this delayed task became first in the queue
288             unpark()
289         } else {
290             DefaultExecutor.enqueue(task)
291         }
292     }
293 
294     @Suppress("UNCHECKED_CAST")
enqueueImplnull295     private fun enqueueImpl(task: Runnable): Boolean {
296         _queue.loop { queue ->
297             if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
298             when (queue) {
299                 null -> if (_queue.compareAndSet(null, task)) return true
300                 is Queue<*> -> {
301                     when ((queue as Queue<Runnable>).addLast(task)) {
302                         Queue.ADD_SUCCESS -> return true
303                         Queue.ADD_CLOSED -> return false
304                         Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
305                     }
306                 }
307                 else -> when {
308                     queue === CLOSED_EMPTY -> return false
309                     else -> {
310                         // update to full-blown queue to add one more
311                         val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
312                         newQueue.addLast(queue as Runnable)
313                         newQueue.addLast(task)
314                         if (_queue.compareAndSet(queue, newQueue)) return true
315                     }
316                 }
317             }
318         }
319     }
320 
321     @Suppress("UNCHECKED_CAST")
dequeuenull322     private fun dequeue(): Runnable? {
323         _queue.loop { queue ->
324             when (queue) {
325                 null -> return null
326                 is Queue<*> -> {
327                     val result = (queue as Queue<Runnable>).removeFirstOrNull()
328                     if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
329                     _queue.compareAndSet(queue, queue.next())
330                 }
331                 else -> when {
332                     queue === CLOSED_EMPTY -> return null
333                     else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
334                 }
335             }
336         }
337     }
338 
closeQueuenull339     private fun closeQueue() {
340         assert { isCompleted }
341         _queue.loop { queue ->
342             when (queue) {
343                 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
344                 is Queue<*> -> {
345                     queue.close()
346                     return
347                 }
348                 else -> when {
349                     queue === CLOSED_EMPTY -> return
350                     else -> {
351                         // update to full-blown queue to close
352                         val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
353                         newQueue.addLast(queue as Runnable)
354                         if (_queue.compareAndSet(queue, newQueue)) return
355                     }
356                 }
357             }
358         }
359 
360     }
361 
schedulenull362     fun schedule(now: Long, delayedTask: DelayedTask) {
363         when (scheduleImpl(now, delayedTask)) {
364             SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
365             SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
366             SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
367             else -> error("unexpected result")
368         }
369     }
370 
shouldUnparknull371     private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
372 
373     private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
374         if (isCompleted) return SCHEDULE_COMPLETED
375         val delayedQueue = _delayed.value ?: run {
376             _delayed.compareAndSet(null, DelayedTaskQueue(now))
377             _delayed.value!!
378         }
379         return delayedTask.scheduleTask(now, delayedQueue, this)
380     }
381 
382     // It performs "hard" shutdown for test cleanup purposes
resetAllnull383     protected fun resetAll() {
384         _queue.value = null
385         _delayed.value = null
386     }
387 
388     // This is a "soft" (normal) shutdown
rescheduleAllDelayednull389     private fun rescheduleAllDelayed() {
390         val now = nanoTime()
391         while (true) {
392             /*
393              * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
394              * synchronized on DelayedTask itself. All other operation are synchronized both on
395              * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
396              * first removes DelayedTask from the heap (under synchronization) then
397              * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
398              */
399             val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
400             reschedule(now, delayedTask)
401         }
402     }
403 
404     internal abstract class DelayedTask(
405         /**
406          * This field can be only modified in [scheduleTask] before putting this DelayedTask
407          * into heap to avoid overflow and corruption of heap data structure.
408          */
409         @JvmField var nanoTime: Long
410     ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode, SynchronizedObject() {
411         @Volatile
412         private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
413 
414         override var heap: ThreadSafeHeap<*>?
415             get() = _heap as? ThreadSafeHeap<*>
416             set(value) {
417                 require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
418                 _heap = value
419             }
420 
421         override var index: Int = -1
422 
compareTonull423         override fun compareTo(other: DelayedTask): Int {
424             val dTime = nanoTime - other.nanoTime
425             return when {
426                 dTime > 0 -> 1
427                 dTime < 0 -> -1
428                 else -> 0
429             }
430         }
431 
timeToExecutenull432         fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
433 
434         fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int = synchronized<Int>(this) {
435             if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
436             delayed.addLastIf(this) { firstTask ->
437                 if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
438                 /**
439                  * We are about to add new task and we have to make sure that [DelayedTaskQueue]
440                  * invariant is maintained. The code in this lambda is additionally executed under
441                  * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
442                  */
443                 if (firstTask == null) {
444                     /**
445                      * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
446                      * the current now time even if that means "going backwards in time". This makes the structure
447                      * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
448                      * are removed from the delayed queue for execution.
449                      */
450                     delayed.timeNow = now
451                 } else {
452                     /**
453                      * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
454                      * and only goes forward in time. We cannot let it go backwards in time or invariant can be
455                      * violated for tasks that were already scheduled.
456                      */
457                     val firstTime = firstTask.nanoTime
458                     // compute min(now, firstTime) using a wrap-safe check
459                     val minTime = if (firstTime - now >= 0) now else firstTime
460                     // update timeNow only when going forward in time
461                     if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
462                 }
463                 /**
464                  * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
465                  * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
466                  * function can be called to reschedule from one queue to another and this might be another reason
467                  * where new task's time might now violate invariant.
468                  * We correct invariant violation (if any) by simply changing this task's time to now.
469                  */
470                 if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
471                 true
472             }
473             return SCHEDULE_OK
474         }
475 
<lambda>null476         final override fun dispose(): Unit = synchronized(this) {
477             val heap = _heap
478             if (heap === DISPOSED_TASK) return // already disposed
479             (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
480             _heap = DISPOSED_TASK // never add again to any heap
481         }
482 
toStringnull483         override fun toString(): String = "Delayed[nanos=$nanoTime]"
484     }
485 
486     private inner class DelayedResumeTask(
487         nanoTime: Long,
488         private val cont: CancellableContinuation<Unit>
489     ) : DelayedTask(nanoTime) {
490         override fun run() { with(cont) { resumeUndispatched(Unit) } }
491         override fun toString(): String = super.toString() + cont.toString()
492     }
493 
494     private class DelayedRunnableTask(
495         nanoTime: Long,
496         private val block: Runnable
497     ) : DelayedTask(nanoTime) {
runnull498         override fun run() { block.run() }
toStringnull499         override fun toString(): String = super.toString() + block.toString()
500     }
501 
502     /**
503      * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
504      * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
505      * heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
506      * The invariant is that for every scheduled [DelayedTask]:
507      *
508      * ```
509      * delayedTask.nanoTime - timeNow >= 0
510      * ```
511      *
512      * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
513      * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
514      * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
515      * (so there is nothing special to do there).
516      */
517     internal class DelayedTaskQueue(
518         @JvmField var timeNow: Long
519     ) : ThreadSafeHeap<DelayedTask>()
520 }
521 
522 internal expect fun createEventLoop(): EventLoop
523 
524 internal expect fun nanoTime(): Long
525 
526 internal expect object DefaultExecutor {
527     fun enqueue(task: Runnable)
528 }
529 
530 /**
531  * Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
532  * non-Darwin native targets.
533  *
534  * Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
535  * the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
536  * be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
537  * pool management, it must manage the pool creation and pool drainage manually.
538  */
platformAutoreleasePoolnull539 internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)
540