<lambda>null1 package kotlinx.coroutines.scheduling
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.internal.*
6 import java.io.*
7 import java.util.concurrent.*
8 import java.util.concurrent.locks.*
9 import kotlin.jvm.internal.Ref.ObjectRef
10 import kotlin.math.*
11 
12 /**
13  * Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines
14  * over worker threads, including both CPU-intensive and blocking tasks, in the most efficient manner.
15  *
16  * Current scheduler implementation has two optimization targets:
17  * - Efficiency in the face of communication patterns (e.g. actors communicating via channel)
18  * - Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool.
19  *
20  * ### Structural overview
21  *
22  * Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to
23  * [maxPoolSize] lazily created  threads to execute blocking tasks.
24  * Every worker has a local queue in addition to a global scheduler queue
25  * and the global queue has priority over local queue to avoid starvation of externally-submitted
26  * (e.g. from Android UI thread) tasks.
27  * Work-stealing is implemented on top of that queues to provide
28  * even load distribution and illusion of centralized run queue.
29  *
30  * ### Scheduling policy
31  *
32  * When a coroutine is dispatched from within a scheduler worker, it's placed into the head of worker run queue.
33  * If the head is not empty, the task from the head is moved to the tail. Though it is an unfair scheduling policy,
34  * it effectively couples communicating coroutines into one and eliminates scheduling latency
35  * that arises from placing tasks to the end of the queue.
36  * Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to stack.
37  * When a coroutine is dispatched from an external thread, it's put into the global queue.
38  * The original idea with a single-slot LIFO buffer comes from Golang runtime scheduler by D. Vyukov.
39  * It was proven to be "fair enough", performant and generally well accepted and initially was a significant inspiration
40  * source for the coroutine scheduler.
41  *
42  * ### Work stealing and affinity
43  *
44  * To provide even tasks distribution worker tries to steal tasks from other workers queues
45  * before parking when his local queue is empty.
46  * A non-standard solution is implemented to provide tasks affinity: a task from FIFO buffer may be stolen
47  * only if it is stale enough based on the value of [WORK_STEALING_TIME_RESOLUTION_NS].
48  * For this purpose, monotonic global clock is used, and every task has associated with its submission time.
49  * This approach shows outstanding results when coroutines are cooperative,
50  * but as downside scheduler now depends on a high-resolution global clock,
51  * which may limit scalability on NUMA machines. Tasks from LIFO buffer can be stolen on a regular basis.
52  *
53  * ### Thread management
54  * One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees
55  * similar to the regular centralized executors.
56  * The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
57  * The former field incorporates the amount of created threads, CPU-tokens and blocking tasks
58  * that require a thread compensation,
59  * while the latter represents intrusive versioned Treiber stack of idle workers.
60  * When a worker cannot find any work, they first add themselves to the stack,
61  * then re-scans the queue to avoid missing signals and then attempts to park
62  * with additional rendezvous against unnecessary parking.
63  * If a worker finds a task that it cannot yet steal due to time constraints, it stores this fact in its state
64  * (to be uncounted when additional work is signalled) and parks for such duration.
65  *
66  * When a new task arrives in the scheduler (whether it is local or global queue),
67  * either an idle worker is being signalled, or a new worker is attempted to be created.
68  * (Only [corePoolSize] workers can be created for regular CPU tasks)
69  *
70  * ### Support for blocking tasks
71  * The scheduler also supports the notion of [blocking][TASK_PROBABLY_BLOCKING] tasks.
72  * When executing or enqueuing blocking tasks, the scheduler notifies or creates one more worker in
73  * addition to core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
74  * to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
75  * "CPU permits" -- [corePoolSize] special tokens that permit an arbitrary worker to execute and steal CPU-bound tasks.
76  * When worker encounters blocking tasks, it basically hands off its permit to another thread (not directly though) to
77  * keep invariant "scheduler always has at least min(pending CPU tasks, core pool size)
78  * and at most core pool size threads to execute CPU tasks".
79  * To avoid overprovision, workers without CPU permit are allowed to scan [globalBlockingQueue]
80  * and steal **only** blocking tasks from other workers.
81  *
82  * The scheduler does not limit the count of pending blocking tasks, potentially creating up to [maxPoolSize] threads.
83  * End users do not have access to the scheduler directly and can dispatch blocking tasks only with
84  * [LimitingDispatcher] that does control concurrency level by its own mechanism.
85  */
86 @Suppress("NOTHING_TO_INLINE")
87 internal class CoroutineScheduler(
88     @JvmField val corePoolSize: Int,
89     @JvmField val maxPoolSize: Int,
90     @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
91     @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
92 ) : Executor, Closeable {
93     init {
94         require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
95             "Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
96         }
97         require(maxPoolSize >= corePoolSize) {
98             "Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
99         }
100         require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
101             "Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
102         }
103         require(idleWorkerKeepAliveNs > 0) {
104             "Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
105         }
106     }
107 
108     @JvmField
109     val globalCpuQueue = GlobalQueue()
110 
111     @JvmField
112     val globalBlockingQueue = GlobalQueue()
113 
114     private fun addToGlobalQueue(task: Task): Boolean {
115         return if (task.isBlocking) {
116             globalBlockingQueue.addLast(task)
117         } else {
118             globalCpuQueue.addLast(task)
119         }
120     }
121 
122     /**
123      * The stack of parker workers.
124      * Every worker registers itself in a stack before parking (if it was not previously registered),
125      * so it can be signalled when new tasks arrive.
126      * This is a form of intrusive garbage-free Treiber stack where [Worker] also is a stack node.
127      *
128      * The stack is better than a queue (even with the contention on top) because it unparks threads
129      * in most-recently used order, improving both performance and locality.
130      * Moreover, it decreases threads thrashing, if the pool has n threads when only n / 2 is required,
131      * the latter half will never be unparked and will terminate itself after [IDLE_WORKER_KEEP_ALIVE_NS].
132      *
133      * This long version consist of version bits with [PARKED_VERSION_MASK]
134      * and top worker thread index bits with [PARKED_INDEX_MASK].
135      */
136     private val parkedWorkersStack = atomic(0L)
137 
138     /**
139      * Updates index of the worker at the top of [parkedWorkersStack].
140      * It always updates version to ensure interference with [parkedWorkersStackPop] operation
141      * that might have already decided to put this index to the top.
142      *
143      * Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
144      */
145     fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) {
146         parkedWorkersStack.loop { top ->
147             val index = (top and PARKED_INDEX_MASK).toInt()
148             val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
149             val updIndex = if (index == oldIndex) {
150                 if (newIndex == 0) {
151                     parkedWorkersStackNextIndex(worker)
152                 } else {
153                     newIndex
154                 }
155             } else {
156                 index // no change to index, but update version
157             }
158             if (updIndex < 0) return@loop // retry
159             if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return
160         }
161     }
162 
163     /**
164      * Pushes worker into [parkedWorkersStack].
165      * It does nothing is this worker is already physically linked to the stack.
166      * This method is invoked only from the worker thread itself.
167      * This invocation always precedes [LockSupport.parkNanos].
168      * See [Worker.tryPark].
169      *
170      * Returns `true` if worker was added to the stack by this invocation, `false` if it was already
171      * registered in the stack.
172      */
173     fun parkedWorkersStackPush(worker: Worker): Boolean {
174         if (worker.nextParkedWorker !== NOT_IN_STACK) return false // already in stack, bail out
175         /*
176          * The below loop can be entered only if this worker was not in the stack and, since no other thread
177          * can add it to the stack (only the worker itself), this invariant holds while this loop executes.
178          */
179         parkedWorkersStack.loop { top ->
180             val index = (top and PARKED_INDEX_MASK).toInt()
181             val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
182             val updIndex = worker.indexInArray
183             assert { updIndex != 0 } // only this worker can push itself, cannot be terminated
184             worker.nextParkedWorker = workers[index]
185             /*
186              * Other thread can be changing this worker's index at this point, but it
187              * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail.
188              * Successful CAS of the stack top completes successful push.
189              */
190             if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return true
191         }
192     }
193 
194     /**
195      * Pops worker from [parkedWorkersStack].
196      * It can be invoked concurrently from any thread that is looking for help and needs to unpark some worker.
197      * This invocation is always followed by an attempt to [LockSupport.unpark] resulting worker.
198      * See [tryUnpark].
199      */
200     private fun parkedWorkersStackPop(): Worker? {
201         parkedWorkersStack.loop { top ->
202             val index = (top and PARKED_INDEX_MASK).toInt()
203             val worker = workers[index] ?: return null // stack is empty
204             val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
205             val updIndex = parkedWorkersStackNextIndex(worker)
206             if (updIndex < 0) return@loop // retry
207             /*
208              * Other thread can be changing this worker's index at this point, but it
209              * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail.
210              * Successful CAS of the stack top completes successful pop.
211              */
212             if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) {
213                 /*
214                  * We've just took worker out of the stack, but nextParkerWorker is not reset yet, so if a worker is
215                  * currently invoking parkedWorkersStackPush it would think it is in the stack and bail out without
216                  * adding itself again. It does not matter, since we are going it invoke unpark on the thread
217                  * that was popped out of parkedWorkersStack anyway.
218                  */
219                 worker.nextParkedWorker = NOT_IN_STACK
220                 return worker
221             }
222         }
223     }
224 
225     /**
226      * Finds next usable index for [parkedWorkersStack]. The problem is that workers can
227      * be terminated at their [Worker.indexInArray] becomes zero, so they cannot be
228      * put at the top of the stack. In which case we are looking for next.
229      *
230      * Returns `index >= 0` or `-1` for retry.
231      */
232     private fun parkedWorkersStackNextIndex(worker: Worker): Int {
233         var next = worker.nextParkedWorker
234         findNext@ while (true) {
235             when {
236                 next === NOT_IN_STACK -> return -1 // we are too late -- other thread popped this element, retry
237                 next === null -> return 0 // stack becomes empty
238                 else -> {
239                     val nextWorker = next as Worker
240                     val updIndex = nextWorker.indexInArray
241                     if (updIndex != 0) return updIndex // found good index for next worker
242                     // Otherwise, this worker was terminated and we cannot put it to top anymore, check next
243                     next = nextWorker.nextParkedWorker
244                 }
245             }
246         }
247     }
248 
249     /**
250      * State of worker threads.
251      * [workers] is a dynamic array of lazily created workers up to [maxPoolSize] workers.
252      * [createdWorkers] is count of already created workers (worker with index lesser than [createdWorkers] exists).
253      * [blockingTasks] is count of pending (either in the queue or being executed) blocking tasks.
254      *
255      * Workers array is also used as a lock for workers' creation and termination sequence.
256      *
257      * **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value), so
258      * workers are 1-indexed, code path in [Worker.trySteal] is a bit faster and index swap during termination
259      * works properly.
260      *
261      * Initial size is `Dispatchers.Default` size * 2 to prevent unnecessary resizes for slightly or steadily loaded
262      * applications.
263      */
264     @JvmField
265     val workers = ResizableAtomicArray<Worker>((corePoolSize + 1) * 2)
266 
267     /**
268      * The `Long` value describing the state of workers in this pool.
269      * Currently, includes created, CPU-acquired, and blocking workers, each occupying [BLOCKING_SHIFT] bits.
270      *
271      * State layout (highest to lowest bits):
272      * | --- number of cpu permits, 22 bits ---  | --- number of blocking tasks, 21 bits ---  | --- number of created threads, 21 bits ---  |
273      */
274     private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
275 
276     private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
277     private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
278 
279     private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
280     private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
281     inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
282 
283     // Guarded by synchronization
284     private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
285     private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
286 
287     private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
288 
289     private inline fun decrementBlockingTasks() {
290         controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
291     }
292 
293     private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
294         val available = availableCpuPermits(state)
295         if (available == 0) return false
296         val update = state - (1L shl CPU_PERMITS_SHIFT)
297         if (controlState.compareAndSet(state, update)) return true
298     }
299 
300     private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
301 
302     // This is used a "stop signal" for close and shutdown functions
303     private val _isTerminated = atomic(false)
304     val isTerminated: Boolean get() = _isTerminated.value
305 
306     companion object {
307         // A symbol to mark workers that are not in parkedWorkersStack
308         @JvmField
309         val NOT_IN_STACK = Symbol("NOT_IN_STACK")
310 
311         // Worker ctl states
312         private const val PARKED = -1
313         private const val CLAIMED = 0
314         private const val TERMINATED = 1
315 
316         // Masks of control state
317         private const val BLOCKING_SHIFT = 21 // 2M threads max
318         private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
319         private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
320         private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
321         private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT
322 
323         internal const val MIN_SUPPORTED_POOL_SIZE = 1 // we support 1 for test purposes, but it is not usually used
324         internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
325 
326         // Masks of parkedWorkersStack
327         private const val PARKED_INDEX_MASK = CREATED_MASK
328         private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
329         private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
330     }
331 
332     override fun execute(command: Runnable) = dispatch(command)
333 
334     override fun close() = shutdown(10_000L)
335 
336     // Shuts down current scheduler and waits until all work is done and all threads are stopped.
337     fun shutdown(timeout: Long) {
338         // atomically set termination flag which is checked when workers are added or removed
339         if (!_isTerminated.compareAndSet(false, true)) return
340         // make sure we are not waiting for the current thread
341         val currentWorker = currentWorker()
342         // Capture # of created workers that cannot change anymore (mind the synchronized block!)
343         val created = synchronized(workers) { createdWorkers }
344         // Shutdown all workers with the only exception of the current thread
345         for (i in 1..created) {
346             val worker = workers[i]!!
347             if (worker !== currentWorker) {
348                 // Note: this is java.lang.Thread.getState() of type java.lang.Thread.State
349                 while (worker.getState() != Thread.State.TERMINATED) {
350                     LockSupport.unpark(worker)
351                     worker.join(timeout)
352                 }
353                 // Note: this is CoroutineScheduler.Worker.state of type CoroutineScheduler.WorkerState
354                 assert { worker.state === WorkerState.TERMINATED } // Expected TERMINATED state
355                 worker.localQueue.offloadAllWorkTo(globalBlockingQueue) // Doesn't actually matter which queue to use
356             }
357         }
358         // Make sure no more work is added to GlobalQueue from anywhere
359         globalBlockingQueue.close()
360         globalCpuQueue.close()
361         // Finish processing tasks from globalQueue and/or from this worker's local queue
362         while (true) {
363             val task = currentWorker?.findTask(true)
364                 ?: globalCpuQueue.removeFirstOrNull()
365                 ?: globalBlockingQueue.removeFirstOrNull()
366                 ?: break
367             runSafely(task)
368         }
369         // Shutdown current thread
370         currentWorker?.tryReleaseCpu(WorkerState.TERMINATED)
371         // check & cleanup state
372         assert { availableCpuPermits == corePoolSize }
373         parkedWorkersStack.value = 0L
374         controlState.value = 0L
375     }
376 
377     /**
378      * Dispatches execution of a runnable [block] with a hint to a scheduler whether
379      * this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
380      *
381      * [taskContext] -- concurrency context of given [block].
382      * [tailDispatch] -- whether this [dispatch] call is the last action the (presumably) worker thread does in its current task.
383      * If `true`, then  the task will be dispatched in a FIFO manner and no additional workers will be requested,
384      * but only if the current thread is a corresponding worker thread.
385      * Note that caller cannot be ensured that it is being executed on worker thread for the following reasons:
386      *   - [CoroutineStart.UNDISPATCHED]
387      *   - Concurrent [close] that effectively shutdowns the worker thread
388      */
389     fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
390         trackTask() // this is needed for virtual time support
391         val task = createTask(block, taskContext)
392         val isBlockingTask = task.isBlocking
393         // Invariant: we increment counter **before** publishing the task
394         // so executing thread can safely decrement the number of blocking tasks
395         val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0
396         // try to submit the task to the local queue and act depending on the result
397         val currentWorker = currentWorker()
398         val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
399         if (notAdded != null) {
400             if (!addToGlobalQueue(notAdded)) {
401                 // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
402                 throw RejectedExecutionException("$schedulerName was terminated")
403             }
404         }
405         val skipUnpark = tailDispatch && currentWorker != null
406         // Checking 'task' instead of 'notAdded' is completely okay
407         if (isBlockingTask) {
408             // Use state snapshot to better estimate the number of running threads
409             signalBlockingWork(stateSnapshot, skipUnpark = skipUnpark)
410         } else {
411             if (skipUnpark) return
412             signalCpuWork()
413         }
414     }
415 
416     fun createTask(block: Runnable, taskContext: TaskContext): Task {
417         val nanoTime = schedulerTimeSource.nanoTime()
418         if (block is Task) {
419             block.submissionTime = nanoTime
420             block.taskContext = taskContext
421             return block
422         }
423         return TaskImpl(block, nanoTime, taskContext)
424     }
425 
426     // NB: should only be called from 'dispatch' method due to blocking tasks increment
427     private fun signalBlockingWork(stateSnapshot: Long, skipUnpark: Boolean) {
428         if (skipUnpark) return
429         if (tryUnpark()) return
430         // Use state snapshot to avoid accidental thread overprovision
431         if (tryCreateWorker(stateSnapshot)) return
432         tryUnpark() // Try unpark again in case there was race between permit release and parking
433     }
434 
435     fun signalCpuWork() {
436         if (tryUnpark()) return
437         if (tryCreateWorker()) return
438         tryUnpark()
439     }
440 
441     private fun tryCreateWorker(state: Long = controlState.value): Boolean {
442         val created = createdWorkers(state)
443         val blocking = blockingTasks(state)
444         val cpuWorkers = (created - blocking).coerceAtLeast(0)
445         /*
446          * We check how many threads are there to handle non-blocking work,
447          * and create one more if we have not enough of them.
448          */
449         if (cpuWorkers < corePoolSize) {
450             val newCpuWorkers = createNewWorker()
451             // If we've created the first cpu worker and corePoolSize > 1 then create
452             // one more (second) cpu worker, so that stealing between them is operational
453             if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
454             if (newCpuWorkers > 0) return true
455         }
456         return false
457     }
458 
459     private fun tryUnpark(): Boolean {
460         while (true) {
461             val worker = parkedWorkersStackPop() ?: return false
462             if (worker.workerCtl.compareAndSet(PARKED, CLAIMED)) {
463                 LockSupport.unpark(worker)
464                 return true
465             }
466         }
467     }
468 
469     /**
470      * Returns the number of CPU workers after this function (including new worker) or
471      * 0 if no worker was created.
472      */
473     private fun createNewWorker(): Int {
474         val worker: Worker
475         return synchronized(workers) {
476             // Make sure we're not trying to resurrect terminated scheduler
477             if (isTerminated) return -1
478             val state = controlState.value
479             val created = createdWorkers(state)
480             val blocking = blockingTasks(state)
481             val cpuWorkers = (created - blocking).coerceAtLeast(0)
482             // Double check for overprovision
483             if (cpuWorkers >= corePoolSize) return 0
484             if (created >= maxPoolSize) return 0
485             // start & register new worker, commit index only after successful creation
486             val newIndex = createdWorkers + 1
487             require(newIndex > 0 && workers[newIndex] == null)
488             /*
489              * 1) Claim the slot (under a lock) by the newly created worker
490              * 2) Make it observable by increment created workers count
491              * 3) Only then start the worker, otherwise it may miss its own creation
492              */
493             worker = Worker(newIndex)
494             workers.setSynchronized(newIndex, worker)
495             require(newIndex == incrementCreatedWorkers())
496             cpuWorkers + 1
497         }.also { worker.start() } // Start worker when the lock is released to reduce contention, see #3652
498     }
499 
500     /**
501      * Returns `null` if task was successfully added or an instance of the
502      * task that was not added or replaced (thus should be added to global queue).
503      */
504     private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
505         if (this == null) return task
506         /*
507          * This worker could have been already terminated from this thread by close/shutdown and it should not
508          * accept any more tasks into its local queue.
509          */
510         if (state === WorkerState.TERMINATED) return task
511         // Do not add CPU tasks in local queue if we are not able to execute it
512         if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) {
513             return task
514         }
515         mayHaveLocalTasks = true
516         return localQueue.add(task, fair = tailDispatch)
517     }
518 
519     private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
520 
521     /**
522      * Returns a string identifying the state of this scheduler for nicer debugging.
523      * Note that this method is not atomic and represents rough state of pool.
524      *
525      * State of the queues:
526      * b for blocking, c for CPU, r for retiring.
527      * E.g. for [1b, 1b, 2c, 1d] means that pool has
528      * two blocking workers with queue size 1, one worker with CPU permit and queue size 1
529      * and one dormant (executing his local queue before parking) worker with queue size 1.
530      */
531     override fun toString(): String {
532         var parkedWorkers = 0
533         var blockingWorkers = 0
534         var cpuWorkers = 0
535         var dormant = 0
536         var terminated = 0
537         val queueSizes = arrayListOf<String>()
538         for (index in 1 until workers.currentLength()) {
539             val worker = workers[index] ?: continue
540             val queueSize = worker.localQueue.size
541             when (worker.state) {
542                 WorkerState.PARKING -> ++parkedWorkers
543                 WorkerState.BLOCKING -> {
544                     ++blockingWorkers
545                     queueSizes += queueSize.toString() + "b" // Blocking
546                 }
547 
548                 WorkerState.CPU_ACQUIRED -> {
549                     ++cpuWorkers
550                     queueSizes += queueSize.toString() + "c" // CPU
551                 }
552 
553                 WorkerState.DORMANT -> {
554                     ++dormant
555                     if (queueSize > 0) queueSizes += queueSize.toString() + "d" // Retiring
556                 }
557 
558                 WorkerState.TERMINATED -> ++terminated
559             }
560         }
561         val state = controlState.value
562         return "$schedulerName@$hexAddress[" +
563             "Pool Size {" +
564             "core = $corePoolSize, " +
565             "max = $maxPoolSize}, " +
566             "Worker States {" +
567             "CPU = $cpuWorkers, " +
568             "blocking = $blockingWorkers, " +
569             "parked = $parkedWorkers, " +
570             "dormant = $dormant, " +
571             "terminated = $terminated}, " +
572             "running workers queues = $queueSizes, " +
573             "global CPU queue size = ${globalCpuQueue.size}, " +
574             "global blocking queue size = ${globalBlockingQueue.size}, " +
575             "Control State {" +
576             "created workers= ${createdWorkers(state)}, " +
577             "blocking tasks = ${blockingTasks(state)}, " +
578             "CPUs acquired = ${corePoolSize - availableCpuPermits(state)}" +
579             "}]"
580     }
581 
582     fun runSafely(task: Task) {
583         try {
584             task.run()
585         } catch (e: Throwable) {
586             val thread = Thread.currentThread()
587             thread.uncaughtExceptionHandler.uncaughtException(thread, e)
588         } finally {
589             unTrackTask()
590         }
591     }
592 
593     internal inner class Worker private constructor() : Thread() {
594         init {
595             isDaemon = true
596             /*
597              * `Dispatchers.Default` is used as *the* dispatcher in the containerized environments,
598              * isolated by their own classloaders. Workers are populated lazily, thus we are inheriting
599              * `Dispatchers.Default` context class loader here instead of using parent' thread one
600              * in order not to accidentally capture temporary application classloader.
601              */
602             contextClassLoader = this@CoroutineScheduler.javaClass.classLoader
603         }
604 
605         // guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
606         @Volatile // volatile for push/pop operation into parkedWorkersStack
607         var indexInArray = 0
608             set(index) {
609                 name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
610                 field = index
611             }
612 
613         constructor(index: Int) : this() {
614             indexInArray = index
615         }
616 
617         inline val scheduler get() = this@CoroutineScheduler
618 
619         @JvmField
620         val localQueue: WorkQueue = WorkQueue()
621 
622         /**
623          * Slot that is used to steal tasks into to avoid re-adding them
624          * to the local queue. See [trySteal]
625          */
626         private val stolenTask: ObjectRef<Task?> = ObjectRef()
627 
628         /**
629          * Worker state. **Updated only by this worker thread**.
630          * By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken.
631          * Is used locally by the worker to maintain its own invariants.
632          */
633         @JvmField
634         var state = WorkerState.DORMANT
635 
636         /**
637          * Worker control state responsible for worker claiming, parking and termination.
638          * List of states:
639          * [PARKED] -- worker is parked and can self-terminate after a termination deadline.
640          * [CLAIMED] -- worker is claimed by an external submitter.
641          * [TERMINATED] -- worker is terminated and no longer usable.
642          */
643         val workerCtl = atomic(CLAIMED)
644 
645         /**
646          * It is set to the termination deadline when started doing [park] and it reset
647          * when there is a task. It serves as protection against spurious wakeups of parkNanos.
648          */
649         private var terminationDeadline = 0L
650 
651         /**
652          * Reference to the next worker in the [parkedWorkersStack].
653          * It may be `null` if there is no next parked worker.
654          * This reference is set to [NOT_IN_STACK] when worker is physically not in stack.
655          */
656         @Volatile
657         var nextParkedWorker: Any? = NOT_IN_STACK
658 
659         /*
660          * The delay until at least one task in other worker queues will become stealable.
661          */
662         private var minDelayUntilStealableTaskNs = 0L
663 
664         /**
665          * The state of embedded Marsaglia xorshift random number generator, used for work-stealing purposes.
666          * It is initialized with a seed.
667          */
668         private var rngState: Int = run {
669             // This could've been Random.nextInt(), but we are shaving an extra initialization cost, see #4051
670             val seed = System.nanoTime().toInt()
671             // rngState shouldn't be zero, as required for the xorshift algorithm
672             if (seed != 0) return@run seed
673             42
674         }
675 
676         /**
677          * Tries to acquire CPU token if worker doesn't have one
678          * @return whether worker acquired (or already had) CPU token
679          */
680         private fun tryAcquireCpuPermit(): Boolean = when {
681             state == WorkerState.CPU_ACQUIRED -> true
682             this@CoroutineScheduler.tryAcquireCpuPermit() -> {
683                 state = WorkerState.CPU_ACQUIRED
684                 true
685             }
686 
687             else -> false
688         }
689 
690         /**
691          * Releases CPU token if worker has any and changes state to [newState].
692          * Returns `true` if CPU permit was returned to the pool
693          */
694         fun tryReleaseCpu(newState: WorkerState): Boolean {
695             val previousState = state
696             val hadCpu = previousState == WorkerState.CPU_ACQUIRED
697             if (hadCpu) releaseCpuPermit()
698             if (previousState != newState) state = newState
699             return hadCpu
700         }
701 
702         override fun run() = runWorker()
703 
704         @JvmField
705         var mayHaveLocalTasks = false
706 
707         private fun runWorker() {
708             var rescanned = false
709             while (!isTerminated && state != WorkerState.TERMINATED) {
710                 val task = findTask(mayHaveLocalTasks)
711                 // Task found. Execute and repeat
712                 if (task != null) {
713                     rescanned = false
714                     minDelayUntilStealableTaskNs = 0L
715                     executeTask(task)
716                     continue
717                 } else {
718                     mayHaveLocalTasks = false
719                 }
720                 /*
721                  * No tasks were found:
722                  * 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline.
723                  *    Then its deadline is stored in [minDelayUntilStealableTask]
724                  * // '2)' can be found below
725                  *
726                  * Then just park for that duration (ditto re-scanning).
727                  * While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
728                  * excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve
729                  * it with "spinning via scans" mechanism.
730                  * NB: this short potential parking does not interfere with `tryUnpark`
731                  */
732                 if (minDelayUntilStealableTaskNs != 0L) {
733                     if (!rescanned) {
734                         rescanned = true
735                     } else {
736                         rescanned = false
737                         tryReleaseCpu(WorkerState.PARKING)
738                         interrupted()
739                         LockSupport.parkNanos(minDelayUntilStealableTaskNs)
740                         minDelayUntilStealableTaskNs = 0L
741                     }
742                     continue
743                 }
744                 /*
745                  * 2) Or no tasks available, time to park and, potentially, shut down the thread.
746                  * Add itself to the stack of parked workers, re-scans all the queues
747                  * to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks.
748                  */
749                 tryPark()
750             }
751             tryReleaseCpu(WorkerState.TERMINATED)
752         }
753 
754         /**
755          * See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details.
756          * This is a fine-tailored method for a specific use-case not expected to be used widely.
757          */
758         fun runSingleTask(): Long {
759             val stateSnapshot = state
760             val isCpuThread = state == WorkerState.CPU_ACQUIRED
761             val task = if (isCpuThread) {
762                 findCpuTask()
763             } else {
764                 findBlockingTask()
765             }
766             if (task == null) {
767                 if (minDelayUntilStealableTaskNs == 0L) return -1L
768                 return minDelayUntilStealableTaskNs
769             }
770             runSafely(task)
771             if (!isCpuThread) decrementBlockingTasks()
772             assert { state == stateSnapshot }
773             return 0L
774         }
775 
776         fun isIo() = state == WorkerState.BLOCKING
777 
778         // Counterpart to "tryUnpark"
779         private fun tryPark() {
780             if (!inStack()) {
781                 parkedWorkersStackPush(this)
782                 return
783             }
784             workerCtl.value = PARKED // Update value once
785             /*
786              * inStack() prevents spurious wakeups, while workerCtl.value == PARKED
787              * prevents the following race:
788              *
789              * - T2 scans the queue, adds itself to the stack, goes to rescan
790              * - T2 suspends in 'workerCtl.value = PARKED' line
791              * - T1 pops T2 from the stack, claims workerCtl, suspends
792              * - T2 fails 'while (inStack())' check, goes to full rescan
793              * - T2 adds itself to the stack, parks
794              * - T1 unparks T2, bails out with success
795              * - T2 unparks and loops in 'while (inStack())'
796              */
797             while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups
798                 if (isTerminated || state == WorkerState.TERMINATED) break
799                 tryReleaseCpu(WorkerState.PARKING)
800                 interrupted() // Cleanup interruptions
801                 park()
802             }
803         }
804 
805         private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK
806 
807         private fun executeTask(task: Task) {
808             val taskMode = task.mode
809             idleReset(taskMode)
810             beforeTask(taskMode)
811             runSafely(task)
812             afterTask(taskMode)
813         }
814 
815         private fun beforeTask(taskMode: Int) {
816             if (taskMode == TASK_NON_BLOCKING) return
817             // Always notify about new work when releasing CPU-permit to execute some blocking task
818             if (tryReleaseCpu(WorkerState.BLOCKING)) {
819                 signalCpuWork()
820             }
821         }
822 
823         private fun afterTask(taskMode: Int) {
824             if (taskMode == TASK_NON_BLOCKING) return
825             decrementBlockingTasks()
826             val currentState = state
827             // Shutdown sequence of blocking dispatcher
828             if (currentState !== WorkerState.TERMINATED) {
829                 assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
830                 state = WorkerState.DORMANT
831             }
832         }
833 
834         /*
835          * Marsaglia xorshift RNG with period 2^32-1 for work stealing purposes.
836          * ThreadLocalRandom cannot be used to support Android and ThreadLocal<Random> is up to 15% slower on Ktor benchmarks
837          */
838         fun nextInt(upperBound: Int): Int {
839             var r = rngState
840             r = r xor (r shl 13)
841             r = r xor (r shr 17)
842             r = r xor (r shl 5)
843             rngState = r
844             val mask = upperBound - 1
845             // Fast path for power of two bound
846             if (mask and upperBound == 0) {
847                 return r and mask
848             }
849             return (r and Int.MAX_VALUE) % upperBound
850         }
851 
852         private fun park() {
853             // set termination deadline the first time we are here (it is reset in idleReset)
854             if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
855             // actually park
856             LockSupport.parkNanos(idleWorkerKeepAliveNs)
857             // try terminate when we are idle past termination deadline
858             // note that comparison is written like this to protect against potential nanoTime wraparound
859             if (System.nanoTime() - terminationDeadline >= 0) {
860                 terminationDeadline = 0L // if attempt to terminate worker fails we'd extend deadline again
861                 tryTerminateWorker()
862             }
863         }
864 
865         /**
866          * Stops execution of current thread and removes it from [createdWorkers].
867          */
868         private fun tryTerminateWorker() {
869             synchronized(workers) {
870                 // Make sure we're not trying race with termination of scheduler
871                 if (isTerminated) return
872                 // Someone else terminated, bail out
873                 if (createdWorkers <= corePoolSize) return
874                 /*
875                  * See tryUnpark for state reasoning.
876                  * If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
877                  */
878                 if (!workerCtl.compareAndSet(PARKED, TERMINATED)) return
879                 /*
880                  * At this point this thread is no longer considered as usable for scheduling.
881                  * We need multi-step choreography to reindex workers.
882                  *
883                  * 1) Read current worker's index and reset it to zero.
884                  */
885                 val oldIndex = indexInArray
886                 indexInArray = 0
887                 /*
888                  * Now this worker cannot become the top of parkedWorkersStack, but it can
889                  * still be at the stack top via oldIndex.
890                  *
891                  * 2) Update top of stack if it was pointing to oldIndex and make sure no
892                  *    pending push/pop operation that might have already retrieved oldIndex could complete.
893                  */
894                 parkedWorkersStackTopUpdate(this, oldIndex, 0)
895                 /*
896                  * 3) Move last worker into an index in array that was previously occupied by this worker,
897                  *    if last worker was a different one (sic!).
898                  */
899                 val lastIndex = decrementCreatedWorkers()
900                 if (lastIndex != oldIndex) {
901                     val lastWorker = workers[lastIndex]!!
902                     workers.setSynchronized(oldIndex, lastWorker)
903                     lastWorker.indexInArray = oldIndex
904                     /*
905                      * Now lastWorker is available at both indices in the array, but it can
906                      * still be at the stack top on via its lastIndex
907                      *
908                      * 4) Update top of stack lastIndex -> oldIndex and make sure no
909                      *    pending push/pop operation that might have already retrieved lastIndex could complete.
910                      */
911                     parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex)
912                 }
913                 /*
914                  * 5) It is safe to clear reference from workers array now.
915                  */
916                 workers.setSynchronized(lastIndex, null)
917             }
918             state = WorkerState.TERMINATED
919         }
920 
921         // It is invoked by this worker when it finds a task
922         private fun idleReset(mode: Int) {
923             terminationDeadline = 0L // reset deadline for termination
924             if (state == WorkerState.PARKING) {
925                 assert { mode == TASK_PROBABLY_BLOCKING }
926                 state = WorkerState.BLOCKING
927             }
928         }
929 
930         fun findTask(mayHaveLocalTasks: Boolean): Task? {
931             if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks)
932             /*
933              * If we can't acquire a CPU permit, attempt to find blocking task:
934              * - Check if our queue has one (maybe mixed in with CPU tasks)
935              * - Poll global and try steal
936              */
937             return findBlockingTask()
938         }
939 
940         // NB: ONLY for runSingleTask method
941         private fun findBlockingTask(): Task? {
942             return localQueue.pollBlocking()
943                 ?: globalBlockingQueue.removeFirstOrNull()
944                 ?: trySteal(STEAL_BLOCKING_ONLY)
945         }
946 
947         // NB: ONLY for runSingleTask method
948         private fun findCpuTask(): Task? {
949             return localQueue.pollCpu()
950                 ?: globalBlockingQueue.removeFirstOrNull()
951                 ?: trySteal(STEAL_CPU_ONLY)
952         }
953 
954         private fun findAnyTask(scanLocalQueue: Boolean): Task? {
955             /*
956              * Anti-starvation mechanism: probabilistically poll either local
957              * or global queue to ensure progress for both external and internal tasks.
958              */
959             if (scanLocalQueue) {
960                 val globalFirst = nextInt(2 * corePoolSize) == 0
961                 if (globalFirst) pollGlobalQueues()?.let { return it }
962                 localQueue.poll()?.let { return it }
963                 if (!globalFirst) pollGlobalQueues()?.let { return it }
964             } else {
965                 pollGlobalQueues()?.let { return it }
966             }
967             return trySteal(STEAL_ANY)
968         }
969 
970         private fun pollGlobalQueues(): Task? {
971             if (nextInt(2) == 0) {
972                 globalCpuQueue.removeFirstOrNull()?.let { return it }
973                 return globalBlockingQueue.removeFirstOrNull()
974             } else {
975                 globalBlockingQueue.removeFirstOrNull()?.let { return it }
976                 return globalCpuQueue.removeFirstOrNull()
977             }
978         }
979 
980         private fun trySteal(stealingMode: StealingMode): Task? {
981             val created = createdWorkers
982             // 0 to await an initialization and 1 to avoid excess stealing on single-core machines
983             if (created < 2) {
984                 return null
985             }
986 
987             var currentIndex = nextInt(created)
988             var minDelay = Long.MAX_VALUE
989             repeat(created) {
990                 ++currentIndex
991                 if (currentIndex > created) currentIndex = 1
992                 val worker = workers[currentIndex]
993                 if (worker !== null && worker !== this) {
994                     val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask)
995                     if (stealResult == TASK_STOLEN) {
996                         val result = stolenTask.element
997                         stolenTask.element = null
998                         return result
999                     } else if (stealResult > 0) {
1000                         minDelay = min(minDelay, stealResult)
1001                     }
1002                 }
1003             }
1004             minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
1005             return null
1006         }
1007     }
1008 
1009     enum class WorkerState {
1010         /**
1011          * Has CPU token and either executes [TASK_NON_BLOCKING] task or tries to find one.
1012          */
1013         CPU_ACQUIRED,
1014 
1015         /**
1016          * Executing task with [TASK_PROBABLY_BLOCKING].
1017          */
1018         BLOCKING,
1019 
1020         /**
1021          * Currently parked.
1022          */
1023         PARKING,
1024 
1025         /**
1026          * Tries to execute its local work and then goes to infinite sleep as no longer needed worker.
1027          */
1028         DORMANT,
1029 
1030         /**
1031          * Terminal state, will no longer be used
1032          */
1033         TERMINATED
1034     }
1035 }
1036 
1037 /**
1038  * Checks if the thread is part of a thread pool that supports coroutines.
1039  * This function is needed for integration with BlockHound.
1040  */
1041 @JvmName("isSchedulerWorker")
isSchedulerWorkernull1042 internal fun isSchedulerWorker(thread: Thread) = thread is CoroutineScheduler.Worker
1043 
1044 /**
1045  * Checks if the thread is running a CPU-bound task.
1046  * This function is needed for integration with BlockHound.
1047  */
1048 @JvmName("mayNotBlock")
1049 internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker &&
1050     thread.state == CoroutineScheduler.WorkerState.CPU_ACQUIRED
1051