1 /* <lambda>null2 * Copyright (C) 2024 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 @file:OptIn(ExperimentalCoroutinesApi::class) 18 19 package com.android.systemui.kairos.internal 20 21 import java.util.concurrent.ConcurrentHashMap 22 import java.util.concurrent.PriorityBlockingQueue 23 import kotlinx.coroutines.ExperimentalCoroutinesApi 24 import kotlinx.coroutines.coroutineScope 25 import kotlinx.coroutines.launch 26 27 internal interface Scheduler { 28 fun schedule(depth: Int, node: MuxNode<*, *, *>) 29 30 fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) 31 } 32 33 internal class SchedulerImpl : Scheduler { 34 val enqueued = ConcurrentHashMap<MuxNode<*, *, *>, Any>() <lambda>null35 val scheduledQ = PriorityBlockingQueue<Pair<Int, MuxNode<*, *, *>>>(16, compareBy { it.first }) 36 schedulenull37 override fun schedule(depth: Int, node: MuxNode<*, *, *>) { 38 if (enqueued.putIfAbsent(node, node) == null) { 39 scheduledQ.add(Pair(depth, node)) 40 } 41 } 42 scheduleIndirectnull43 override fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) { 44 schedule(Int.MIN_VALUE + indirectDepth, node) 45 } 46 drainEvalnull47 internal suspend fun drainEval(network: Network) { 48 drain { runStep -> 49 runStep { muxNode -> network.evalScope { muxNode.visit(this) } } 50 // If any visited MuxPromptNodes had their depths increased, eagerly propagate those 51 // depth changes now before performing further network evaluation. 52 network.compactor.drainCompact() 53 } 54 } 55 drainCompactnull56 internal suspend fun drainCompact() { 57 drain { runStep -> runStep { muxNode -> muxNode.visitCompact(scheduler = this) } } 58 } 59 drainnull60 private suspend inline fun drain( 61 crossinline onStep: 62 suspend (runStep: suspend (visit: suspend (MuxNode<*, *, *>) -> Unit) -> Unit) -> Unit 63 ): Unit = coroutineScope { 64 while (scheduledQ.isNotEmpty()) { 65 val maxDepth = scheduledQ.peek()?.first ?: error("Unexpected empty scheduler") 66 onStep { visit -> runStep(maxDepth, visit) } 67 } 68 } 69 runStepnull70 private suspend inline fun runStep( 71 maxDepth: Int, 72 crossinline visit: suspend (MuxNode<*, *, *>) -> Unit, 73 ) = coroutineScope { 74 while (scheduledQ.peek()?.first?.let { it <= maxDepth } == true) { 75 val (d, node) = scheduledQ.remove() 76 if ( 77 node.depthTracker.dirty_hasDirectUpstream() && 78 d < node.depthTracker.dirty_directDepth 79 ) { 80 scheduledQ.add(node.depthTracker.dirty_directDepth to node) 81 } else { 82 launch { 83 enqueued.remove(node) 84 visit(node) 85 } 86 } 87 } 88 } 89 } 90