<lambda>null1 package kotlinx.coroutines
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.internal.*
5 import kotlin.concurrent.Volatile
6 import kotlin.coroutines.*
7 import kotlin.jvm.*
8
9 /**
10 * Extended by [CoroutineDispatcher] implementations that have event loop inside and can
11 * be asked to process next event from their event queue.
12 *
13 * It may optionally implement [Delay] interface and support time-scheduled tasks.
14 * It is created or pigged back onto (see [ThreadLocalEventLoop])
15 * by `runBlocking` and by [Dispatchers.Unconfined].
16 *
17 * @suppress **This an internal API and should not be used from general code.**
18 */
19 internal abstract class EventLoop : CoroutineDispatcher() {
20 /**
21 * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
22 */
23 private var useCount = 0L
24
25 /**
26 * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
27 * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
28 * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
29 */
30 private var shared = false
31
32 /**
33 * Queue used by [Dispatchers.Unconfined] tasks.
34 * These tasks are thread-local for performance and take precedence over the rest of the queue.
35 */
36 private var unconfinedQueue: ArrayDeque<DispatchedTask<*>>? = null
37
38 /**
39 * Processes next event in this event loop.
40 *
41 * The result of this function is to be interpreted like this:
42 * - `<= 0` -- there are potentially more events for immediate processing;
43 * - `> 0` -- a number of nanoseconds to wait for next scheduled event;
44 * - [Long.MAX_VALUE] -- no more events.
45 *
46 * **NOTE**: Must be invoked only from the event loop's thread
47 * (no check for performance reasons, may be added in the future).
48 */
49 open fun processNextEvent(): Long {
50 if (!processUnconfinedEvent()) return Long.MAX_VALUE
51 return 0
52 }
53
54 protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
55
56 protected open val nextTime: Long
57 get() {
58 val queue = unconfinedQueue ?: return Long.MAX_VALUE
59 return if (queue.isEmpty()) Long.MAX_VALUE else 0L
60 }
61
62 fun processUnconfinedEvent(): Boolean {
63 val queue = unconfinedQueue ?: return false
64 val task = queue.removeFirstOrNull() ?: return false
65 task.run()
66 return true
67 }
68 /**
69 * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
70 * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
71 * By default, event loop implementation is thread-local and should not processed in the context
72 * (current thread's event loop should be processed instead).
73 */
74 open fun shouldBeProcessedFromContext(): Boolean = false
75
76 /**
77 * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
78 * into the current event loop.
79 */
80 fun dispatchUnconfined(task: DispatchedTask<*>) {
81 val queue = unconfinedQueue ?:
82 ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
83 queue.addLast(task)
84 }
85
86 val isActive: Boolean
87 get() = useCount > 0
88
89 val isUnconfinedLoopActive: Boolean
90 get() = useCount >= delta(unconfined = true)
91
92 // May only be used from the event loop's thread
93 val isUnconfinedQueueEmpty: Boolean
94 get() = unconfinedQueue?.isEmpty() ?: true
95
96 private fun delta(unconfined: Boolean) =
97 if (unconfined) (1L shl 32) else 1L
98
99 fun incrementUseCount(unconfined: Boolean = false) {
100 useCount += delta(unconfined)
101 if (!unconfined) shared = true
102 }
103
104 fun decrementUseCount(unconfined: Boolean = false) {
105 useCount -= delta(unconfined)
106 if (useCount > 0) return
107 assert { useCount == 0L } // "Extra decrementUseCount"
108 if (shared) {
109 // shut it down and remove from ThreadLocalEventLoop
110 shutdown()
111 }
112 }
113
114 final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
115 parallelism.checkParallelism()
116 return this
117 }
118
119 open fun shutdown() {}
120 }
121
122 internal object ThreadLocalEventLoop {
123 private val ref = commonThreadLocal<EventLoop?>(Symbol("ThreadLocalEventLoop"))
124
125 internal val eventLoop: EventLoop
<lambda>null126 get() = ref.get() ?: createEventLoop().also { ref.set(it) }
127
currentOrNullnull128 internal fun currentOrNull(): EventLoop? =
129 ref.get()
130
131 internal fun resetEventLoop() {
132 ref.set(null)
133 }
134
setEventLoopnull135 internal fun setEventLoop(eventLoop: EventLoop) {
136 ref.set(eventLoop)
137 }
138 }
139
140 private val DISPOSED_TASK = Symbol("REMOVED_TASK")
141
142 // results for scheduleImpl
143 private const val SCHEDULE_OK = 0
144 private const val SCHEDULE_COMPLETED = 1
145 private const val SCHEDULE_DISPOSED = 2
146
147 private const val MS_TO_NS = 1_000_000L
148 private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
149
150 /**
151 * First-line overflow protection -- limit maximal delay.
152 * Delays longer than this one (~146 years) are considered to be delayed "forever".
153 */
154 private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
155
delayToNanosnull156 internal fun delayToNanos(timeMillis: Long): Long = when {
157 timeMillis <= 0 -> 0L
158 timeMillis >= MAX_MS -> Long.MAX_VALUE
159 else -> timeMillis * MS_TO_NS
160 }
161
delayNanosToMillisnull162 internal fun delayNanosToMillis(timeNanos: Long): Long =
163 timeNanos / MS_TO_NS
164
165 private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
166
167 private typealias Queue<T> = LockFreeTaskQueueCore<T>
168
169 internal expect abstract class EventLoopImplPlatform() : EventLoop {
170 // Called to unpark this event loop's thread
171 protected fun unpark()
172
173 // Called to reschedule to DefaultExecutor when this event loop is complete
174 protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
175 }
176
177 internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
178 // null | CLOSED_EMPTY | task | Queue<Runnable>
179 private val _queue = atomic<Any?>(null)
180
181 // Allocated only only once
182 private val _delayed = atomic<DelayedTaskQueue?>(null)
183
184 private val _isCompleted = atomic(false)
185 private var isCompleted
186 get() = _isCompleted.value
187 set(value) { _isCompleted.value = value }
188
189 override val isEmpty: Boolean get() {
190 if (!isUnconfinedQueueEmpty) return false
191 val delayed = _delayed.value
192 if (delayed != null && !delayed.isEmpty) return false
193 return when (val queue = _queue.value) {
194 null -> true
195 is Queue<*> -> queue.isEmpty
196 else -> queue === CLOSED_EMPTY
197 }
198 }
199
200 override val nextTime: Long
201 get() {
202 if (super.nextTime == 0L) return 0L
203 val queue = _queue.value
204 when {
205 queue === null -> {} // empty queue -- proceed
206 queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
207 queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
208 else -> return 0 // non-empty queue
209 }
210 val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
211 return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
212 }
213
shutdownnull214 override fun shutdown() {
215 // Clean up thread-local reference here -- this event loop is shutting down
216 ThreadLocalEventLoop.resetEventLoop()
217 // We should signal that this event loop should not accept any more tasks
218 // and process queued events (that could have been added after last processNextEvent)
219 isCompleted = true
220 closeQueue()
221 // complete processing of all queued tasks
222 while (processNextEvent() <= 0) { /* spin */ }
223 // reschedule the rest of delayed tasks
224 rescheduleAllDelayed()
225 }
226
scheduleResumeAfterDelaynull227 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
228 val timeNanos = delayToNanos(timeMillis)
229 if (timeNanos < MAX_DELAY_NS) {
230 val now = nanoTime()
231 DelayedResumeTask(now + timeNanos, continuation).also { task ->
232 /*
233 * Order is important here: first we schedule the heap and only then
234 * publish it to continuation. Otherwise, `DelayedResumeTask` would
235 * have to know how to be disposed of even when it wasn't scheduled yet.
236 */
237 schedule(now, task)
238 continuation.disposeOnCancellation(task)
239 }
240 }
241 }
242
scheduleInvokeOnTimeoutnull243 protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
244 val timeNanos = delayToNanos(timeMillis)
245 return if (timeNanos < MAX_DELAY_NS) {
246 val now = nanoTime()
247 DelayedRunnableTask(now + timeNanos, block).also { task ->
248 schedule(now, task)
249 }
250 } else {
251 NonDisposableHandle
252 }
253 }
254
processNextEventnull255 override fun processNextEvent(): Long {
256 // unconfined events take priority
257 if (processUnconfinedEvent()) return 0
258 // queue all delayed tasks that are due to be executed
259 val delayed = _delayed.value
260 if (delayed != null && !delayed.isEmpty) {
261 val now = nanoTime()
262 while (true) {
263 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
264 // to make sure that 'isEmpty' and `nextTime` that check both of them
265 // do not transiently report that both delayed and queue are empty during move
266 delayed.removeFirstIf {
267 if (it.timeToExecute(now)) {
268 enqueueImpl(it)
269 } else
270 false
271 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
272 }
273 }
274 // then process one event from queue
275 val task = dequeue()
276 if (task != null) {
277 platformAutoreleasePool { task.run() }
278 return 0
279 }
280 return nextTime
281 }
282
dispatchnull283 final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
284
285 open fun enqueue(task: Runnable) {
286 if (enqueueImpl(task)) {
287 // todo: we should unpark only when this delayed task became first in the queue
288 unpark()
289 } else {
290 DefaultExecutor.enqueue(task)
291 }
292 }
293
294 @Suppress("UNCHECKED_CAST")
enqueueImplnull295 private fun enqueueImpl(task: Runnable): Boolean {
296 _queue.loop { queue ->
297 if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
298 when (queue) {
299 null -> if (_queue.compareAndSet(null, task)) return true
300 is Queue<*> -> {
301 when ((queue as Queue<Runnable>).addLast(task)) {
302 Queue.ADD_SUCCESS -> return true
303 Queue.ADD_CLOSED -> return false
304 Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
305 }
306 }
307 else -> when {
308 queue === CLOSED_EMPTY -> return false
309 else -> {
310 // update to full-blown queue to add one more
311 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
312 newQueue.addLast(queue as Runnable)
313 newQueue.addLast(task)
314 if (_queue.compareAndSet(queue, newQueue)) return true
315 }
316 }
317 }
318 }
319 }
320
321 @Suppress("UNCHECKED_CAST")
dequeuenull322 private fun dequeue(): Runnable? {
323 _queue.loop { queue ->
324 when (queue) {
325 null -> return null
326 is Queue<*> -> {
327 val result = (queue as Queue<Runnable>).removeFirstOrNull()
328 if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
329 _queue.compareAndSet(queue, queue.next())
330 }
331 else -> when {
332 queue === CLOSED_EMPTY -> return null
333 else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
334 }
335 }
336 }
337 }
338
closeQueuenull339 private fun closeQueue() {
340 assert { isCompleted }
341 _queue.loop { queue ->
342 when (queue) {
343 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
344 is Queue<*> -> {
345 queue.close()
346 return
347 }
348 else -> when {
349 queue === CLOSED_EMPTY -> return
350 else -> {
351 // update to full-blown queue to close
352 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
353 newQueue.addLast(queue as Runnable)
354 if (_queue.compareAndSet(queue, newQueue)) return
355 }
356 }
357 }
358 }
359
360 }
361
schedulenull362 fun schedule(now: Long, delayedTask: DelayedTask) {
363 when (scheduleImpl(now, delayedTask)) {
364 SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
365 SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
366 SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
367 else -> error("unexpected result")
368 }
369 }
370
shouldUnparknull371 private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
372
373 private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
374 if (isCompleted) return SCHEDULE_COMPLETED
375 val delayedQueue = _delayed.value ?: run {
376 _delayed.compareAndSet(null, DelayedTaskQueue(now))
377 _delayed.value!!
378 }
379 return delayedTask.scheduleTask(now, delayedQueue, this)
380 }
381
382 // It performs "hard" shutdown for test cleanup purposes
resetAllnull383 protected fun resetAll() {
384 _queue.value = null
385 _delayed.value = null
386 }
387
388 // This is a "soft" (normal) shutdown
rescheduleAllDelayednull389 private fun rescheduleAllDelayed() {
390 val now = nanoTime()
391 while (true) {
392 /*
393 * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
394 * synchronized on DelayedTask itself. All other operation are synchronized both on
395 * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
396 * first removes DelayedTask from the heap (under synchronization) then
397 * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
398 */
399 val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
400 reschedule(now, delayedTask)
401 }
402 }
403
404 internal abstract class DelayedTask(
405 /**
406 * This field can be only modified in [scheduleTask] before putting this DelayedTask
407 * into heap to avoid overflow and corruption of heap data structure.
408 */
409 @JvmField var nanoTime: Long
410 ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode, SynchronizedObject() {
411 @Volatile
412 private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
413
414 override var heap: ThreadSafeHeap<*>?
415 get() = _heap as? ThreadSafeHeap<*>
416 set(value) {
417 require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
418 _heap = value
419 }
420
421 override var index: Int = -1
422
compareTonull423 override fun compareTo(other: DelayedTask): Int {
424 val dTime = nanoTime - other.nanoTime
425 return when {
426 dTime > 0 -> 1
427 dTime < 0 -> -1
428 else -> 0
429 }
430 }
431
timeToExecutenull432 fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
433
434 fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int = synchronized<Int>(this) {
435 if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
436 delayed.addLastIf(this) { firstTask ->
437 if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
438 /**
439 * We are about to add new task and we have to make sure that [DelayedTaskQueue]
440 * invariant is maintained. The code in this lambda is additionally executed under
441 * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
442 */
443 if (firstTask == null) {
444 /**
445 * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
446 * the current now time even if that means "going backwards in time". This makes the structure
447 * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
448 * are removed from the delayed queue for execution.
449 */
450 delayed.timeNow = now
451 } else {
452 /**
453 * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
454 * and only goes forward in time. We cannot let it go backwards in time or invariant can be
455 * violated for tasks that were already scheduled.
456 */
457 val firstTime = firstTask.nanoTime
458 // compute min(now, firstTime) using a wrap-safe check
459 val minTime = if (firstTime - now >= 0) now else firstTime
460 // update timeNow only when going forward in time
461 if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
462 }
463 /**
464 * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
465 * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
466 * function can be called to reschedule from one queue to another and this might be another reason
467 * where new task's time might now violate invariant.
468 * We correct invariant violation (if any) by simply changing this task's time to now.
469 */
470 if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
471 true
472 }
473 return SCHEDULE_OK
474 }
475
<lambda>null476 final override fun dispose(): Unit = synchronized(this) {
477 val heap = _heap
478 if (heap === DISPOSED_TASK) return // already disposed
479 (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
480 _heap = DISPOSED_TASK // never add again to any heap
481 }
482
toStringnull483 override fun toString(): String = "Delayed[nanos=$nanoTime]"
484 }
485
486 private inner class DelayedResumeTask(
487 nanoTime: Long,
488 private val cont: CancellableContinuation<Unit>
489 ) : DelayedTask(nanoTime) {
490 override fun run() { with(cont) { resumeUndispatched(Unit) } }
491 override fun toString(): String = super.toString() + cont.toString()
492 }
493
494 private class DelayedRunnableTask(
495 nanoTime: Long,
496 private val block: Runnable
497 ) : DelayedTask(nanoTime) {
runnull498 override fun run() { block.run() }
toStringnull499 override fun toString(): String = super.toString() + block.toString()
500 }
501
502 /**
503 * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
504 * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
505 * heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
506 * The invariant is that for every scheduled [DelayedTask]:
507 *
508 * ```
509 * delayedTask.nanoTime - timeNow >= 0
510 * ```
511 *
512 * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
513 * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
514 * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
515 * (so there is nothing special to do there).
516 */
517 internal class DelayedTaskQueue(
518 @JvmField var timeNow: Long
519 ) : ThreadSafeHeap<DelayedTask>()
520 }
521
522 internal expect fun createEventLoop(): EventLoop
523
524 internal expect fun nanoTime(): Long
525
526 internal expect object DefaultExecutor {
527 fun enqueue(task: Runnable)
528 }
529
530 /**
531 * Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
532 * non-Darwin native targets.
533 *
534 * Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
535 * the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
536 * be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
537 * pool management, it must manage the pool creation and pool drainage manually.
538 */
platformAutoreleasePoolnull539 internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)
540