<lambda>null1 package kotlinx.coroutines.scheduling
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.internal.*
6 import java.io.*
7 import java.util.concurrent.*
8 import java.util.concurrent.locks.*
9 import kotlin.jvm.internal.Ref.ObjectRef
10 import kotlin.math.*
11
12 /**
13 * Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines
14 * over worker threads, including both CPU-intensive and blocking tasks, in the most efficient manner.
15 *
16 * Current scheduler implementation has two optimization targets:
17 * - Efficiency in the face of communication patterns (e.g. actors communicating via channel)
18 * - Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool.
19 *
20 * ### Structural overview
21 *
22 * Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to
23 * [maxPoolSize] lazily created threads to execute blocking tasks.
24 * Every worker has a local queue in addition to a global scheduler queue
25 * and the global queue has priority over local queue to avoid starvation of externally-submitted
26 * (e.g. from Android UI thread) tasks.
27 * Work-stealing is implemented on top of that queues to provide
28 * even load distribution and illusion of centralized run queue.
29 *
30 * ### Scheduling policy
31 *
32 * When a coroutine is dispatched from within a scheduler worker, it's placed into the head of worker run queue.
33 * If the head is not empty, the task from the head is moved to the tail. Though it is an unfair scheduling policy,
34 * it effectively couples communicating coroutines into one and eliminates scheduling latency
35 * that arises from placing tasks to the end of the queue.
36 * Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to stack.
37 * When a coroutine is dispatched from an external thread, it's put into the global queue.
38 * The original idea with a single-slot LIFO buffer comes from Golang runtime scheduler by D. Vyukov.
39 * It was proven to be "fair enough", performant and generally well accepted and initially was a significant inspiration
40 * source for the coroutine scheduler.
41 *
42 * ### Work stealing and affinity
43 *
44 * To provide even tasks distribution worker tries to steal tasks from other workers queues
45 * before parking when his local queue is empty.
46 * A non-standard solution is implemented to provide tasks affinity: a task from FIFO buffer may be stolen
47 * only if it is stale enough based on the value of [WORK_STEALING_TIME_RESOLUTION_NS].
48 * For this purpose, monotonic global clock is used, and every task has associated with its submission time.
49 * This approach shows outstanding results when coroutines are cooperative,
50 * but as downside scheduler now depends on a high-resolution global clock,
51 * which may limit scalability on NUMA machines. Tasks from LIFO buffer can be stolen on a regular basis.
52 *
53 * ### Thread management
54 * One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees
55 * similar to the regular centralized executors.
56 * The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
57 * The former field incorporates the amount of created threads, CPU-tokens and blocking tasks
58 * that require a thread compensation,
59 * while the latter represents intrusive versioned Treiber stack of idle workers.
60 * When a worker cannot find any work, they first add themselves to the stack,
61 * then re-scans the queue to avoid missing signals and then attempts to park
62 * with additional rendezvous against unnecessary parking.
63 * If a worker finds a task that it cannot yet steal due to time constraints, it stores this fact in its state
64 * (to be uncounted when additional work is signalled) and parks for such duration.
65 *
66 * When a new task arrives in the scheduler (whether it is local or global queue),
67 * either an idle worker is being signalled, or a new worker is attempted to be created.
68 * (Only [corePoolSize] workers can be created for regular CPU tasks)
69 *
70 * ### Support for blocking tasks
71 * The scheduler also supports the notion of [blocking][TASK_PROBABLY_BLOCKING] tasks.
72 * When executing or enqueuing blocking tasks, the scheduler notifies or creates one more worker in
73 * addition to core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
74 * to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
75 * "CPU permits" -- [corePoolSize] special tokens that permit an arbitrary worker to execute and steal CPU-bound tasks.
76 * When worker encounters blocking tasks, it basically hands off its permit to another thread (not directly though) to
77 * keep invariant "scheduler always has at least min(pending CPU tasks, core pool size)
78 * and at most core pool size threads to execute CPU tasks".
79 * To avoid overprovision, workers without CPU permit are allowed to scan [globalBlockingQueue]
80 * and steal **only** blocking tasks from other workers.
81 *
82 * The scheduler does not limit the count of pending blocking tasks, potentially creating up to [maxPoolSize] threads.
83 * End users do not have access to the scheduler directly and can dispatch blocking tasks only with
84 * [LimitingDispatcher] that does control concurrency level by its own mechanism.
85 */
86 @Suppress("NOTHING_TO_INLINE")
87 internal class CoroutineScheduler(
88 @JvmField val corePoolSize: Int,
89 @JvmField val maxPoolSize: Int,
90 @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
91 @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
92 ) : Executor, Closeable {
93 init {
94 require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
95 "Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
96 }
97 require(maxPoolSize >= corePoolSize) {
98 "Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
99 }
100 require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
101 "Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
102 }
103 require(idleWorkerKeepAliveNs > 0) {
104 "Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
105 }
106 }
107
108 @JvmField
109 val globalCpuQueue = GlobalQueue()
110
111 @JvmField
112 val globalBlockingQueue = GlobalQueue()
113
114 private fun addToGlobalQueue(task: Task): Boolean {
115 return if (task.isBlocking) {
116 globalBlockingQueue.addLast(task)
117 } else {
118 globalCpuQueue.addLast(task)
119 }
120 }
121
122 /**
123 * The stack of parker workers.
124 * Every worker registers itself in a stack before parking (if it was not previously registered),
125 * so it can be signalled when new tasks arrive.
126 * This is a form of intrusive garbage-free Treiber stack where [Worker] also is a stack node.
127 *
128 * The stack is better than a queue (even with the contention on top) because it unparks threads
129 * in most-recently used order, improving both performance and locality.
130 * Moreover, it decreases threads thrashing, if the pool has n threads when only n / 2 is required,
131 * the latter half will never be unparked and will terminate itself after [IDLE_WORKER_KEEP_ALIVE_NS].
132 *
133 * This long version consist of version bits with [PARKED_VERSION_MASK]
134 * and top worker thread index bits with [PARKED_INDEX_MASK].
135 */
136 private val parkedWorkersStack = atomic(0L)
137
138 /**
139 * Updates index of the worker at the top of [parkedWorkersStack].
140 * It always updates version to ensure interference with [parkedWorkersStackPop] operation
141 * that might have already decided to put this index to the top.
142 *
143 * Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
144 */
145 fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) {
146 parkedWorkersStack.loop { top ->
147 val index = (top and PARKED_INDEX_MASK).toInt()
148 val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
149 val updIndex = if (index == oldIndex) {
150 if (newIndex == 0) {
151 parkedWorkersStackNextIndex(worker)
152 } else {
153 newIndex
154 }
155 } else {
156 index // no change to index, but update version
157 }
158 if (updIndex < 0) return@loop // retry
159 if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return
160 }
161 }
162
163 /**
164 * Pushes worker into [parkedWorkersStack].
165 * It does nothing is this worker is already physically linked to the stack.
166 * This method is invoked only from the worker thread itself.
167 * This invocation always precedes [LockSupport.parkNanos].
168 * See [Worker.tryPark].
169 *
170 * Returns `true` if worker was added to the stack by this invocation, `false` if it was already
171 * registered in the stack.
172 */
173 fun parkedWorkersStackPush(worker: Worker): Boolean {
174 if (worker.nextParkedWorker !== NOT_IN_STACK) return false // already in stack, bail out
175 /*
176 * The below loop can be entered only if this worker was not in the stack and, since no other thread
177 * can add it to the stack (only the worker itself), this invariant holds while this loop executes.
178 */
179 parkedWorkersStack.loop { top ->
180 val index = (top and PARKED_INDEX_MASK).toInt()
181 val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
182 val updIndex = worker.indexInArray
183 assert { updIndex != 0 } // only this worker can push itself, cannot be terminated
184 worker.nextParkedWorker = workers[index]
185 /*
186 * Other thread can be changing this worker's index at this point, but it
187 * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail.
188 * Successful CAS of the stack top completes successful push.
189 */
190 if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return true
191 }
192 }
193
194 /**
195 * Pops worker from [parkedWorkersStack].
196 * It can be invoked concurrently from any thread that is looking for help and needs to unpark some worker.
197 * This invocation is always followed by an attempt to [LockSupport.unpark] resulting worker.
198 * See [tryUnpark].
199 */
200 private fun parkedWorkersStackPop(): Worker? {
201 parkedWorkersStack.loop { top ->
202 val index = (top and PARKED_INDEX_MASK).toInt()
203 val worker = workers[index] ?: return null // stack is empty
204 val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
205 val updIndex = parkedWorkersStackNextIndex(worker)
206 if (updIndex < 0) return@loop // retry
207 /*
208 * Other thread can be changing this worker's index at this point, but it
209 * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail.
210 * Successful CAS of the stack top completes successful pop.
211 */
212 if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) {
213 /*
214 * We've just took worker out of the stack, but nextParkerWorker is not reset yet, so if a worker is
215 * currently invoking parkedWorkersStackPush it would think it is in the stack and bail out without
216 * adding itself again. It does not matter, since we are going it invoke unpark on the thread
217 * that was popped out of parkedWorkersStack anyway.
218 */
219 worker.nextParkedWorker = NOT_IN_STACK
220 return worker
221 }
222 }
223 }
224
225 /**
226 * Finds next usable index for [parkedWorkersStack]. The problem is that workers can
227 * be terminated at their [Worker.indexInArray] becomes zero, so they cannot be
228 * put at the top of the stack. In which case we are looking for next.
229 *
230 * Returns `index >= 0` or `-1` for retry.
231 */
232 private fun parkedWorkersStackNextIndex(worker: Worker): Int {
233 var next = worker.nextParkedWorker
234 findNext@ while (true) {
235 when {
236 next === NOT_IN_STACK -> return -1 // we are too late -- other thread popped this element, retry
237 next === null -> return 0 // stack becomes empty
238 else -> {
239 val nextWorker = next as Worker
240 val updIndex = nextWorker.indexInArray
241 if (updIndex != 0) return updIndex // found good index for next worker
242 // Otherwise, this worker was terminated and we cannot put it to top anymore, check next
243 next = nextWorker.nextParkedWorker
244 }
245 }
246 }
247 }
248
249 /**
250 * State of worker threads.
251 * [workers] is a dynamic array of lazily created workers up to [maxPoolSize] workers.
252 * [createdWorkers] is count of already created workers (worker with index lesser than [createdWorkers] exists).
253 * [blockingTasks] is count of pending (either in the queue or being executed) blocking tasks.
254 *
255 * Workers array is also used as a lock for workers' creation and termination sequence.
256 *
257 * **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value), so
258 * workers are 1-indexed, code path in [Worker.trySteal] is a bit faster and index swap during termination
259 * works properly.
260 *
261 * Initial size is `Dispatchers.Default` size * 2 to prevent unnecessary resizes for slightly or steadily loaded
262 * applications.
263 */
264 @JvmField
265 val workers = ResizableAtomicArray<Worker>((corePoolSize + 1) * 2)
266
267 /**
268 * The `Long` value describing the state of workers in this pool.
269 * Currently, includes created, CPU-acquired, and blocking workers, each occupying [BLOCKING_SHIFT] bits.
270 *
271 * State layout (highest to lowest bits):
272 * | --- number of cpu permits, 22 bits --- | --- number of blocking tasks, 21 bits --- | --- number of created threads, 21 bits --- |
273 */
274 private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
275
276 private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
277 private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
278
279 private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
280 private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
281 inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
282
283 // Guarded by synchronization
284 private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
285 private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
286
287 private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
288
289 private inline fun decrementBlockingTasks() {
290 controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
291 }
292
293 private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
294 val available = availableCpuPermits(state)
295 if (available == 0) return false
296 val update = state - (1L shl CPU_PERMITS_SHIFT)
297 if (controlState.compareAndSet(state, update)) return true
298 }
299
300 private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
301
302 // This is used a "stop signal" for close and shutdown functions
303 private val _isTerminated = atomic(false)
304 val isTerminated: Boolean get() = _isTerminated.value
305
306 companion object {
307 // A symbol to mark workers that are not in parkedWorkersStack
308 @JvmField
309 val NOT_IN_STACK = Symbol("NOT_IN_STACK")
310
311 // Worker ctl states
312 private const val PARKED = -1
313 private const val CLAIMED = 0
314 private const val TERMINATED = 1
315
316 // Masks of control state
317 private const val BLOCKING_SHIFT = 21 // 2M threads max
318 private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
319 private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
320 private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
321 private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT
322
323 internal const val MIN_SUPPORTED_POOL_SIZE = 1 // we support 1 for test purposes, but it is not usually used
324 internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
325
326 // Masks of parkedWorkersStack
327 private const val PARKED_INDEX_MASK = CREATED_MASK
328 private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
329 private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
330 }
331
332 override fun execute(command: Runnable) = dispatch(command)
333
334 override fun close() = shutdown(10_000L)
335
336 // Shuts down current scheduler and waits until all work is done and all threads are stopped.
337 fun shutdown(timeout: Long) {
338 // atomically set termination flag which is checked when workers are added or removed
339 if (!_isTerminated.compareAndSet(false, true)) return
340 // make sure we are not waiting for the current thread
341 val currentWorker = currentWorker()
342 // Capture # of created workers that cannot change anymore (mind the synchronized block!)
343 val created = synchronized(workers) { createdWorkers }
344 // Shutdown all workers with the only exception of the current thread
345 for (i in 1..created) {
346 val worker = workers[i]!!
347 if (worker !== currentWorker) {
348 // Note: this is java.lang.Thread.getState() of type java.lang.Thread.State
349 while (worker.getState() != Thread.State.TERMINATED) {
350 LockSupport.unpark(worker)
351 worker.join(timeout)
352 }
353 // Note: this is CoroutineScheduler.Worker.state of type CoroutineScheduler.WorkerState
354 assert { worker.state === WorkerState.TERMINATED } // Expected TERMINATED state
355 worker.localQueue.offloadAllWorkTo(globalBlockingQueue) // Doesn't actually matter which queue to use
356 }
357 }
358 // Make sure no more work is added to GlobalQueue from anywhere
359 globalBlockingQueue.close()
360 globalCpuQueue.close()
361 // Finish processing tasks from globalQueue and/or from this worker's local queue
362 while (true) {
363 val task = currentWorker?.findTask(true)
364 ?: globalCpuQueue.removeFirstOrNull()
365 ?: globalBlockingQueue.removeFirstOrNull()
366 ?: break
367 runSafely(task)
368 }
369 // Shutdown current thread
370 currentWorker?.tryReleaseCpu(WorkerState.TERMINATED)
371 // check & cleanup state
372 assert { availableCpuPermits == corePoolSize }
373 parkedWorkersStack.value = 0L
374 controlState.value = 0L
375 }
376
377 /**
378 * Dispatches execution of a runnable [block] with a hint to a scheduler whether
379 * this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
380 *
381 * [taskContext] -- concurrency context of given [block].
382 * [tailDispatch] -- whether this [dispatch] call is the last action the (presumably) worker thread does in its current task.
383 * If `true`, then the task will be dispatched in a FIFO manner and no additional workers will be requested,
384 * but only if the current thread is a corresponding worker thread.
385 * Note that caller cannot be ensured that it is being executed on worker thread for the following reasons:
386 * - [CoroutineStart.UNDISPATCHED]
387 * - Concurrent [close] that effectively shutdowns the worker thread
388 */
389 fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
390 trackTask() // this is needed for virtual time support
391 val task = createTask(block, taskContext)
392 val isBlockingTask = task.isBlocking
393 // Invariant: we increment counter **before** publishing the task
394 // so executing thread can safely decrement the number of blocking tasks
395 val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0
396 // try to submit the task to the local queue and act depending on the result
397 val currentWorker = currentWorker()
398 val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
399 if (notAdded != null) {
400 if (!addToGlobalQueue(notAdded)) {
401 // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
402 throw RejectedExecutionException("$schedulerName was terminated")
403 }
404 }
405 val skipUnpark = tailDispatch && currentWorker != null
406 // Checking 'task' instead of 'notAdded' is completely okay
407 if (isBlockingTask) {
408 // Use state snapshot to better estimate the number of running threads
409 signalBlockingWork(stateSnapshot, skipUnpark = skipUnpark)
410 } else {
411 if (skipUnpark) return
412 signalCpuWork()
413 }
414 }
415
416 fun createTask(block: Runnable, taskContext: TaskContext): Task {
417 val nanoTime = schedulerTimeSource.nanoTime()
418 if (block is Task) {
419 block.submissionTime = nanoTime
420 block.taskContext = taskContext
421 return block
422 }
423 return TaskImpl(block, nanoTime, taskContext)
424 }
425
426 // NB: should only be called from 'dispatch' method due to blocking tasks increment
427 private fun signalBlockingWork(stateSnapshot: Long, skipUnpark: Boolean) {
428 if (skipUnpark) return
429 if (tryUnpark()) return
430 // Use state snapshot to avoid accidental thread overprovision
431 if (tryCreateWorker(stateSnapshot)) return
432 tryUnpark() // Try unpark again in case there was race between permit release and parking
433 }
434
435 fun signalCpuWork() {
436 if (tryUnpark()) return
437 if (tryCreateWorker()) return
438 tryUnpark()
439 }
440
441 private fun tryCreateWorker(state: Long = controlState.value): Boolean {
442 val created = createdWorkers(state)
443 val blocking = blockingTasks(state)
444 val cpuWorkers = (created - blocking).coerceAtLeast(0)
445 /*
446 * We check how many threads are there to handle non-blocking work,
447 * and create one more if we have not enough of them.
448 */
449 if (cpuWorkers < corePoolSize) {
450 val newCpuWorkers = createNewWorker()
451 // If we've created the first cpu worker and corePoolSize > 1 then create
452 // one more (second) cpu worker, so that stealing between them is operational
453 if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
454 if (newCpuWorkers > 0) return true
455 }
456 return false
457 }
458
459 private fun tryUnpark(): Boolean {
460 while (true) {
461 val worker = parkedWorkersStackPop() ?: return false
462 if (worker.workerCtl.compareAndSet(PARKED, CLAIMED)) {
463 LockSupport.unpark(worker)
464 return true
465 }
466 }
467 }
468
469 /**
470 * Returns the number of CPU workers after this function (including new worker) or
471 * 0 if no worker was created.
472 */
473 private fun createNewWorker(): Int {
474 val worker: Worker
475 return synchronized(workers) {
476 // Make sure we're not trying to resurrect terminated scheduler
477 if (isTerminated) return -1
478 val state = controlState.value
479 val created = createdWorkers(state)
480 val blocking = blockingTasks(state)
481 val cpuWorkers = (created - blocking).coerceAtLeast(0)
482 // Double check for overprovision
483 if (cpuWorkers >= corePoolSize) return 0
484 if (created >= maxPoolSize) return 0
485 // start & register new worker, commit index only after successful creation
486 val newIndex = createdWorkers + 1
487 require(newIndex > 0 && workers[newIndex] == null)
488 /*
489 * 1) Claim the slot (under a lock) by the newly created worker
490 * 2) Make it observable by increment created workers count
491 * 3) Only then start the worker, otherwise it may miss its own creation
492 */
493 worker = Worker(newIndex)
494 workers.setSynchronized(newIndex, worker)
495 require(newIndex == incrementCreatedWorkers())
496 cpuWorkers + 1
497 }.also { worker.start() } // Start worker when the lock is released to reduce contention, see #3652
498 }
499
500 /**
501 * Returns `null` if task was successfully added or an instance of the
502 * task that was not added or replaced (thus should be added to global queue).
503 */
504 private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
505 if (this == null) return task
506 /*
507 * This worker could have been already terminated from this thread by close/shutdown and it should not
508 * accept any more tasks into its local queue.
509 */
510 if (state === WorkerState.TERMINATED) return task
511 // Do not add CPU tasks in local queue if we are not able to execute it
512 if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) {
513 return task
514 }
515 mayHaveLocalTasks = true
516 return localQueue.add(task, fair = tailDispatch)
517 }
518
519 private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
520
521 /**
522 * Returns a string identifying the state of this scheduler for nicer debugging.
523 * Note that this method is not atomic and represents rough state of pool.
524 *
525 * State of the queues:
526 * b for blocking, c for CPU, r for retiring.
527 * E.g. for [1b, 1b, 2c, 1d] means that pool has
528 * two blocking workers with queue size 1, one worker with CPU permit and queue size 1
529 * and one dormant (executing his local queue before parking) worker with queue size 1.
530 */
531 override fun toString(): String {
532 var parkedWorkers = 0
533 var blockingWorkers = 0
534 var cpuWorkers = 0
535 var dormant = 0
536 var terminated = 0
537 val queueSizes = arrayListOf<String>()
538 for (index in 1 until workers.currentLength()) {
539 val worker = workers[index] ?: continue
540 val queueSize = worker.localQueue.size
541 when (worker.state) {
542 WorkerState.PARKING -> ++parkedWorkers
543 WorkerState.BLOCKING -> {
544 ++blockingWorkers
545 queueSizes += queueSize.toString() + "b" // Blocking
546 }
547
548 WorkerState.CPU_ACQUIRED -> {
549 ++cpuWorkers
550 queueSizes += queueSize.toString() + "c" // CPU
551 }
552
553 WorkerState.DORMANT -> {
554 ++dormant
555 if (queueSize > 0) queueSizes += queueSize.toString() + "d" // Retiring
556 }
557
558 WorkerState.TERMINATED -> ++terminated
559 }
560 }
561 val state = controlState.value
562 return "$schedulerName@$hexAddress[" +
563 "Pool Size {" +
564 "core = $corePoolSize, " +
565 "max = $maxPoolSize}, " +
566 "Worker States {" +
567 "CPU = $cpuWorkers, " +
568 "blocking = $blockingWorkers, " +
569 "parked = $parkedWorkers, " +
570 "dormant = $dormant, " +
571 "terminated = $terminated}, " +
572 "running workers queues = $queueSizes, " +
573 "global CPU queue size = ${globalCpuQueue.size}, " +
574 "global blocking queue size = ${globalBlockingQueue.size}, " +
575 "Control State {" +
576 "created workers= ${createdWorkers(state)}, " +
577 "blocking tasks = ${blockingTasks(state)}, " +
578 "CPUs acquired = ${corePoolSize - availableCpuPermits(state)}" +
579 "}]"
580 }
581
582 fun runSafely(task: Task) {
583 try {
584 task.run()
585 } catch (e: Throwable) {
586 val thread = Thread.currentThread()
587 thread.uncaughtExceptionHandler.uncaughtException(thread, e)
588 } finally {
589 unTrackTask()
590 }
591 }
592
593 internal inner class Worker private constructor() : Thread() {
594 init {
595 isDaemon = true
596 /*
597 * `Dispatchers.Default` is used as *the* dispatcher in the containerized environments,
598 * isolated by their own classloaders. Workers are populated lazily, thus we are inheriting
599 * `Dispatchers.Default` context class loader here instead of using parent' thread one
600 * in order not to accidentally capture temporary application classloader.
601 */
602 contextClassLoader = this@CoroutineScheduler.javaClass.classLoader
603 }
604
605 // guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
606 @Volatile // volatile for push/pop operation into parkedWorkersStack
607 var indexInArray = 0
608 set(index) {
609 name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
610 field = index
611 }
612
613 constructor(index: Int) : this() {
614 indexInArray = index
615 }
616
617 inline val scheduler get() = this@CoroutineScheduler
618
619 @JvmField
620 val localQueue: WorkQueue = WorkQueue()
621
622 /**
623 * Slot that is used to steal tasks into to avoid re-adding them
624 * to the local queue. See [trySteal]
625 */
626 private val stolenTask: ObjectRef<Task?> = ObjectRef()
627
628 /**
629 * Worker state. **Updated only by this worker thread**.
630 * By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken.
631 * Is used locally by the worker to maintain its own invariants.
632 */
633 @JvmField
634 var state = WorkerState.DORMANT
635
636 /**
637 * Worker control state responsible for worker claiming, parking and termination.
638 * List of states:
639 * [PARKED] -- worker is parked and can self-terminate after a termination deadline.
640 * [CLAIMED] -- worker is claimed by an external submitter.
641 * [TERMINATED] -- worker is terminated and no longer usable.
642 */
643 val workerCtl = atomic(CLAIMED)
644
645 /**
646 * It is set to the termination deadline when started doing [park] and it reset
647 * when there is a task. It serves as protection against spurious wakeups of parkNanos.
648 */
649 private var terminationDeadline = 0L
650
651 /**
652 * Reference to the next worker in the [parkedWorkersStack].
653 * It may be `null` if there is no next parked worker.
654 * This reference is set to [NOT_IN_STACK] when worker is physically not in stack.
655 */
656 @Volatile
657 var nextParkedWorker: Any? = NOT_IN_STACK
658
659 /*
660 * The delay until at least one task in other worker queues will become stealable.
661 */
662 private var minDelayUntilStealableTaskNs = 0L
663
664 /**
665 * The state of embedded Marsaglia xorshift random number generator, used for work-stealing purposes.
666 * It is initialized with a seed.
667 */
668 private var rngState: Int = run {
669 // This could've been Random.nextInt(), but we are shaving an extra initialization cost, see #4051
670 val seed = System.nanoTime().toInt()
671 // rngState shouldn't be zero, as required for the xorshift algorithm
672 if (seed != 0) return@run seed
673 42
674 }
675
676 /**
677 * Tries to acquire CPU token if worker doesn't have one
678 * @return whether worker acquired (or already had) CPU token
679 */
680 private fun tryAcquireCpuPermit(): Boolean = when {
681 state == WorkerState.CPU_ACQUIRED -> true
682 this@CoroutineScheduler.tryAcquireCpuPermit() -> {
683 state = WorkerState.CPU_ACQUIRED
684 true
685 }
686
687 else -> false
688 }
689
690 /**
691 * Releases CPU token if worker has any and changes state to [newState].
692 * Returns `true` if CPU permit was returned to the pool
693 */
694 fun tryReleaseCpu(newState: WorkerState): Boolean {
695 val previousState = state
696 val hadCpu = previousState == WorkerState.CPU_ACQUIRED
697 if (hadCpu) releaseCpuPermit()
698 if (previousState != newState) state = newState
699 return hadCpu
700 }
701
702 override fun run() = runWorker()
703
704 @JvmField
705 var mayHaveLocalTasks = false
706
707 private fun runWorker() {
708 var rescanned = false
709 while (!isTerminated && state != WorkerState.TERMINATED) {
710 val task = findTask(mayHaveLocalTasks)
711 // Task found. Execute and repeat
712 if (task != null) {
713 rescanned = false
714 minDelayUntilStealableTaskNs = 0L
715 executeTask(task)
716 continue
717 } else {
718 mayHaveLocalTasks = false
719 }
720 /*
721 * No tasks were found:
722 * 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline.
723 * Then its deadline is stored in [minDelayUntilStealableTask]
724 * // '2)' can be found below
725 *
726 * Then just park for that duration (ditto re-scanning).
727 * While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
728 * excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve
729 * it with "spinning via scans" mechanism.
730 * NB: this short potential parking does not interfere with `tryUnpark`
731 */
732 if (minDelayUntilStealableTaskNs != 0L) {
733 if (!rescanned) {
734 rescanned = true
735 } else {
736 rescanned = false
737 tryReleaseCpu(WorkerState.PARKING)
738 interrupted()
739 LockSupport.parkNanos(minDelayUntilStealableTaskNs)
740 minDelayUntilStealableTaskNs = 0L
741 }
742 continue
743 }
744 /*
745 * 2) Or no tasks available, time to park and, potentially, shut down the thread.
746 * Add itself to the stack of parked workers, re-scans all the queues
747 * to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks.
748 */
749 tryPark()
750 }
751 tryReleaseCpu(WorkerState.TERMINATED)
752 }
753
754 /**
755 * See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details.
756 * This is a fine-tailored method for a specific use-case not expected to be used widely.
757 */
758 fun runSingleTask(): Long {
759 val stateSnapshot = state
760 val isCpuThread = state == WorkerState.CPU_ACQUIRED
761 val task = if (isCpuThread) {
762 findCpuTask()
763 } else {
764 findBlockingTask()
765 }
766 if (task == null) {
767 if (minDelayUntilStealableTaskNs == 0L) return -1L
768 return minDelayUntilStealableTaskNs
769 }
770 runSafely(task)
771 if (!isCpuThread) decrementBlockingTasks()
772 assert { state == stateSnapshot }
773 return 0L
774 }
775
776 fun isIo() = state == WorkerState.BLOCKING
777
778 // Counterpart to "tryUnpark"
779 private fun tryPark() {
780 if (!inStack()) {
781 parkedWorkersStackPush(this)
782 return
783 }
784 workerCtl.value = PARKED // Update value once
785 /*
786 * inStack() prevents spurious wakeups, while workerCtl.value == PARKED
787 * prevents the following race:
788 *
789 * - T2 scans the queue, adds itself to the stack, goes to rescan
790 * - T2 suspends in 'workerCtl.value = PARKED' line
791 * - T1 pops T2 from the stack, claims workerCtl, suspends
792 * - T2 fails 'while (inStack())' check, goes to full rescan
793 * - T2 adds itself to the stack, parks
794 * - T1 unparks T2, bails out with success
795 * - T2 unparks and loops in 'while (inStack())'
796 */
797 while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups
798 if (isTerminated || state == WorkerState.TERMINATED) break
799 tryReleaseCpu(WorkerState.PARKING)
800 interrupted() // Cleanup interruptions
801 park()
802 }
803 }
804
805 private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK
806
807 private fun executeTask(task: Task) {
808 val taskMode = task.mode
809 idleReset(taskMode)
810 beforeTask(taskMode)
811 runSafely(task)
812 afterTask(taskMode)
813 }
814
815 private fun beforeTask(taskMode: Int) {
816 if (taskMode == TASK_NON_BLOCKING) return
817 // Always notify about new work when releasing CPU-permit to execute some blocking task
818 if (tryReleaseCpu(WorkerState.BLOCKING)) {
819 signalCpuWork()
820 }
821 }
822
823 private fun afterTask(taskMode: Int) {
824 if (taskMode == TASK_NON_BLOCKING) return
825 decrementBlockingTasks()
826 val currentState = state
827 // Shutdown sequence of blocking dispatcher
828 if (currentState !== WorkerState.TERMINATED) {
829 assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
830 state = WorkerState.DORMANT
831 }
832 }
833
834 /*
835 * Marsaglia xorshift RNG with period 2^32-1 for work stealing purposes.
836 * ThreadLocalRandom cannot be used to support Android and ThreadLocal<Random> is up to 15% slower on Ktor benchmarks
837 */
838 fun nextInt(upperBound: Int): Int {
839 var r = rngState
840 r = r xor (r shl 13)
841 r = r xor (r shr 17)
842 r = r xor (r shl 5)
843 rngState = r
844 val mask = upperBound - 1
845 // Fast path for power of two bound
846 if (mask and upperBound == 0) {
847 return r and mask
848 }
849 return (r and Int.MAX_VALUE) % upperBound
850 }
851
852 private fun park() {
853 // set termination deadline the first time we are here (it is reset in idleReset)
854 if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
855 // actually park
856 LockSupport.parkNanos(idleWorkerKeepAliveNs)
857 // try terminate when we are idle past termination deadline
858 // note that comparison is written like this to protect against potential nanoTime wraparound
859 if (System.nanoTime() - terminationDeadline >= 0) {
860 terminationDeadline = 0L // if attempt to terminate worker fails we'd extend deadline again
861 tryTerminateWorker()
862 }
863 }
864
865 /**
866 * Stops execution of current thread and removes it from [createdWorkers].
867 */
868 private fun tryTerminateWorker() {
869 synchronized(workers) {
870 // Make sure we're not trying race with termination of scheduler
871 if (isTerminated) return
872 // Someone else terminated, bail out
873 if (createdWorkers <= corePoolSize) return
874 /*
875 * See tryUnpark for state reasoning.
876 * If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
877 */
878 if (!workerCtl.compareAndSet(PARKED, TERMINATED)) return
879 /*
880 * At this point this thread is no longer considered as usable for scheduling.
881 * We need multi-step choreography to reindex workers.
882 *
883 * 1) Read current worker's index and reset it to zero.
884 */
885 val oldIndex = indexInArray
886 indexInArray = 0
887 /*
888 * Now this worker cannot become the top of parkedWorkersStack, but it can
889 * still be at the stack top via oldIndex.
890 *
891 * 2) Update top of stack if it was pointing to oldIndex and make sure no
892 * pending push/pop operation that might have already retrieved oldIndex could complete.
893 */
894 parkedWorkersStackTopUpdate(this, oldIndex, 0)
895 /*
896 * 3) Move last worker into an index in array that was previously occupied by this worker,
897 * if last worker was a different one (sic!).
898 */
899 val lastIndex = decrementCreatedWorkers()
900 if (lastIndex != oldIndex) {
901 val lastWorker = workers[lastIndex]!!
902 workers.setSynchronized(oldIndex, lastWorker)
903 lastWorker.indexInArray = oldIndex
904 /*
905 * Now lastWorker is available at both indices in the array, but it can
906 * still be at the stack top on via its lastIndex
907 *
908 * 4) Update top of stack lastIndex -> oldIndex and make sure no
909 * pending push/pop operation that might have already retrieved lastIndex could complete.
910 */
911 parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex)
912 }
913 /*
914 * 5) It is safe to clear reference from workers array now.
915 */
916 workers.setSynchronized(lastIndex, null)
917 }
918 state = WorkerState.TERMINATED
919 }
920
921 // It is invoked by this worker when it finds a task
922 private fun idleReset(mode: Int) {
923 terminationDeadline = 0L // reset deadline for termination
924 if (state == WorkerState.PARKING) {
925 assert { mode == TASK_PROBABLY_BLOCKING }
926 state = WorkerState.BLOCKING
927 }
928 }
929
930 fun findTask(mayHaveLocalTasks: Boolean): Task? {
931 if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks)
932 /*
933 * If we can't acquire a CPU permit, attempt to find blocking task:
934 * - Check if our queue has one (maybe mixed in with CPU tasks)
935 * - Poll global and try steal
936 */
937 return findBlockingTask()
938 }
939
940 // NB: ONLY for runSingleTask method
941 private fun findBlockingTask(): Task? {
942 return localQueue.pollBlocking()
943 ?: globalBlockingQueue.removeFirstOrNull()
944 ?: trySteal(STEAL_BLOCKING_ONLY)
945 }
946
947 // NB: ONLY for runSingleTask method
948 private fun findCpuTask(): Task? {
949 return localQueue.pollCpu()
950 ?: globalBlockingQueue.removeFirstOrNull()
951 ?: trySteal(STEAL_CPU_ONLY)
952 }
953
954 private fun findAnyTask(scanLocalQueue: Boolean): Task? {
955 /*
956 * Anti-starvation mechanism: probabilistically poll either local
957 * or global queue to ensure progress for both external and internal tasks.
958 */
959 if (scanLocalQueue) {
960 val globalFirst = nextInt(2 * corePoolSize) == 0
961 if (globalFirst) pollGlobalQueues()?.let { return it }
962 localQueue.poll()?.let { return it }
963 if (!globalFirst) pollGlobalQueues()?.let { return it }
964 } else {
965 pollGlobalQueues()?.let { return it }
966 }
967 return trySteal(STEAL_ANY)
968 }
969
970 private fun pollGlobalQueues(): Task? {
971 if (nextInt(2) == 0) {
972 globalCpuQueue.removeFirstOrNull()?.let { return it }
973 return globalBlockingQueue.removeFirstOrNull()
974 } else {
975 globalBlockingQueue.removeFirstOrNull()?.let { return it }
976 return globalCpuQueue.removeFirstOrNull()
977 }
978 }
979
980 private fun trySteal(stealingMode: StealingMode): Task? {
981 val created = createdWorkers
982 // 0 to await an initialization and 1 to avoid excess stealing on single-core machines
983 if (created < 2) {
984 return null
985 }
986
987 var currentIndex = nextInt(created)
988 var minDelay = Long.MAX_VALUE
989 repeat(created) {
990 ++currentIndex
991 if (currentIndex > created) currentIndex = 1
992 val worker = workers[currentIndex]
993 if (worker !== null && worker !== this) {
994 val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask)
995 if (stealResult == TASK_STOLEN) {
996 val result = stolenTask.element
997 stolenTask.element = null
998 return result
999 } else if (stealResult > 0) {
1000 minDelay = min(minDelay, stealResult)
1001 }
1002 }
1003 }
1004 minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
1005 return null
1006 }
1007 }
1008
1009 enum class WorkerState {
1010 /**
1011 * Has CPU token and either executes [TASK_NON_BLOCKING] task or tries to find one.
1012 */
1013 CPU_ACQUIRED,
1014
1015 /**
1016 * Executing task with [TASK_PROBABLY_BLOCKING].
1017 */
1018 BLOCKING,
1019
1020 /**
1021 * Currently parked.
1022 */
1023 PARKING,
1024
1025 /**
1026 * Tries to execute its local work and then goes to infinite sleep as no longer needed worker.
1027 */
1028 DORMANT,
1029
1030 /**
1031 * Terminal state, will no longer be used
1032 */
1033 TERMINATED
1034 }
1035 }
1036
1037 /**
1038 * Checks if the thread is part of a thread pool that supports coroutines.
1039 * This function is needed for integration with BlockHound.
1040 */
1041 @JvmName("isSchedulerWorker")
isSchedulerWorkernull1042 internal fun isSchedulerWorker(thread: Thread) = thread is CoroutineScheduler.Worker
1043
1044 /**
1045 * Checks if the thread is running a CPU-bound task.
1046 * This function is needed for integration with BlockHound.
1047 */
1048 @JvmName("mayNotBlock")
1049 internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker &&
1050 thread.state == CoroutineScheduler.WorkerState.CPU_ACQUIRED
1051