xref: /aosp_15_r20/frameworks/base/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Scheduler.kt (revision d57664e9bc4670b3ecf6748a746a57c557b6bc9e)
1 /*
<lambda>null2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 @file:OptIn(ExperimentalCoroutinesApi::class)
18 
19 package com.android.systemui.kairos.internal
20 
21 import java.util.concurrent.ConcurrentHashMap
22 import java.util.concurrent.PriorityBlockingQueue
23 import kotlinx.coroutines.ExperimentalCoroutinesApi
24 import kotlinx.coroutines.coroutineScope
25 import kotlinx.coroutines.launch
26 
27 internal interface Scheduler {
28     fun schedule(depth: Int, node: MuxNode<*, *, *>)
29 
30     fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>)
31 }
32 
33 internal class SchedulerImpl : Scheduler {
34     val enqueued = ConcurrentHashMap<MuxNode<*, *, *>, Any>()
<lambda>null35     val scheduledQ = PriorityBlockingQueue<Pair<Int, MuxNode<*, *, *>>>(16, compareBy { it.first })
36 
schedulenull37     override fun schedule(depth: Int, node: MuxNode<*, *, *>) {
38         if (enqueued.putIfAbsent(node, node) == null) {
39             scheduledQ.add(Pair(depth, node))
40         }
41     }
42 
scheduleIndirectnull43     override fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) {
44         schedule(Int.MIN_VALUE + indirectDepth, node)
45     }
46 
drainEvalnull47     internal suspend fun drainEval(network: Network) {
48         drain { runStep ->
49             runStep { muxNode -> network.evalScope { muxNode.visit(this) } }
50             // If any visited MuxPromptNodes had their depths increased, eagerly propagate those
51             // depth changes now before performing further network evaluation.
52             network.compactor.drainCompact()
53         }
54     }
55 
drainCompactnull56     internal suspend fun drainCompact() {
57         drain { runStep -> runStep { muxNode -> muxNode.visitCompact(scheduler = this) } }
58     }
59 
drainnull60     private suspend inline fun drain(
61         crossinline onStep:
62             suspend (runStep: suspend (visit: suspend (MuxNode<*, *, *>) -> Unit) -> Unit) -> Unit
63     ): Unit = coroutineScope {
64         while (scheduledQ.isNotEmpty()) {
65             val maxDepth = scheduledQ.peek()?.first ?: error("Unexpected empty scheduler")
66             onStep { visit -> runStep(maxDepth, visit) }
67         }
68     }
69 
runStepnull70     private suspend inline fun runStep(
71         maxDepth: Int,
72         crossinline visit: suspend (MuxNode<*, *, *>) -> Unit,
73     ) = coroutineScope {
74         while (scheduledQ.peek()?.first?.let { it <= maxDepth } == true) {
75             val (d, node) = scheduledQ.remove()
76             if (
77                 node.depthTracker.dirty_hasDirectUpstream() &&
78                     d < node.depthTracker.dirty_directDepth
79             ) {
80                 scheduledQ.add(node.depthTracker.dirty_directDepth to node)
81             } else {
82                 launch {
83                     enqueued.remove(node)
84                     visit(node)
85                 }
86             }
87         }
88     }
89 }
90