1 /*
<lambda>null2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.systemui.kairos.internal
18
19 import com.android.systemui.kairos.FrpDeferredValue
20 import com.android.systemui.kairos.FrpStateScope
21 import com.android.systemui.kairos.FrpStateful
22 import com.android.systemui.kairos.FrpTransactionScope
23 import com.android.systemui.kairos.GroupedTFlow
24 import com.android.systemui.kairos.TFlow
25 import com.android.systemui.kairos.TFlowInit
26 import com.android.systemui.kairos.TFlowLoop
27 import com.android.systemui.kairos.TState
28 import com.android.systemui.kairos.TStateInit
29 import com.android.systemui.kairos.emptyTFlow
30 import com.android.systemui.kairos.groupByKey
31 import com.android.systemui.kairos.init
32 import com.android.systemui.kairos.internal.util.mapValuesParallel
33 import com.android.systemui.kairos.mapCheap
34 import com.android.systemui.kairos.merge
35 import com.android.systemui.kairos.switch
36 import com.android.systemui.kairos.util.Maybe
37 import com.android.systemui.kairos.util.map
38 import kotlin.coroutines.Continuation
39 import kotlin.coroutines.CoroutineContext
40 import kotlin.coroutines.EmptyCoroutineContext
41 import kotlin.coroutines.startCoroutine
42 import kotlinx.coroutines.CompletableDeferred
43 import kotlinx.coroutines.Deferred
44 import kotlinx.coroutines.completeWith
45 import kotlinx.coroutines.job
46
47 internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: TFlow<Any>) :
48 StateScope, EvalScope by evalScope {
49
50 private val endSignalOnce: TFlow<Any> = endSignal.nextOnlyInternal("StateScope.endSignal")
51
52 private fun <A> TFlow<A>.truncateToScope(operatorName: String): TFlow<A> =
53 if (endSignalOnce === emptyTFlow) {
54 this
55 } else {
56 endSignalOnce.mapCheap { emptyTFlow }.toTStateInternal(operatorName, this).switch()
57 }
58
59 private fun <A> TFlow<A>.nextOnlyInternal(operatorName: String): TFlow<A> =
60 if (this === emptyTFlow) {
61 this
62 } else {
63 TFlowLoop<A>().apply {
64 loopback =
65 mapCheap { emptyTFlow }
66 .toTStateInternal(operatorName, this@nextOnlyInternal)
67 .switch()
68 }
69 }
70
71 private fun <A> TFlow<A>.toTStateInternal(operatorName: String, init: A): TState<A> =
72 toTStateInternalDeferred(operatorName, CompletableDeferred(init))
73
74 private fun <A> TFlow<A>.toTStateInternalDeferred(
75 operatorName: String,
76 init: Deferred<A>,
77 ): TState<A> {
78 val changes = this@toTStateInternalDeferred
79 val name = operatorName
80 val impl =
81 mkState(name, operatorName, evalScope, { changes.init.connect(evalScope = this) }, init)
82 return TStateInit(constInit(name, impl))
83 }
84
85 private fun <R> deferredInternal(block: suspend FrpStateScope.() -> R): FrpDeferredValue<R> =
86 FrpDeferredValue(deferAsync { runInStateScope(block) })
87
88 private fun <A> TFlow<A>.toTStateDeferredInternal(
89 initialValue: FrpDeferredValue<A>
90 ): TState<A> {
91 val operatorName = "toTStateDeferred"
92 // Ensure state is only collected until the end of this scope
93 return truncateToScope(operatorName)
94 .toTStateInternalDeferred(operatorName, initialValue.unwrapped)
95 }
96
97 private fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyInternal(
98 storage: TState<Map<K, TFlow<V>>>
99 ): TFlow<Map<K, V>> {
100 val name = "mergeIncrementally"
101 return TFlowInit(
102 constInit(
103 name,
104 switchDeferredImpl(
105 getStorage = {
106 storage.init
107 .connect(this)
108 .getCurrentWithEpoch(this)
109 .first
110 .mapValuesParallel { (_, flow) -> flow.init.connect(this) }
111 },
112 getPatches = {
113 mapImpl({ init.connect(this) }) { patch ->
114 patch.mapValuesParallel { (_, m) ->
115 m.map { flow -> flow.init.connect(this) }
116 }
117 }
118 },
119 ),
120 )
121 )
122 }
123
124 private fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptInternal(
125 storage: TState<Map<K, TFlow<V>>>
126 ): TFlow<Map<K, V>> {
127 val name = "mergeIncrementallyPrompt"
128 return TFlowInit(
129 constInit(
130 name,
131 switchPromptImpl(
132 getStorage = {
133 storage.init
134 .connect(this)
135 .getCurrentWithEpoch(this)
136 .first
137 .mapValuesParallel { (_, flow) -> flow.init.connect(this) }
138 },
139 getPatches = {
140 mapImpl({ init.connect(this) }) { patch ->
141 patch.mapValuesParallel { (_, m) ->
142 m.map { flow -> flow.init.connect(this) }
143 }
144 }
145 },
146 ),
147 )
148 )
149 }
150
151 private fun <K, A, B> TFlow<Map<K, Maybe<FrpStateful<A>>>>.applyLatestStatefulForKeyInternal(
152 init: FrpDeferredValue<Map<K, FrpStateful<B>>>,
153 numKeys: Int?,
154 ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> {
155 val eventsByKey: GroupedTFlow<K, Maybe<FrpStateful<A>>> = groupByKey(numKeys)
156 val initOut: Deferred<Map<K, B>> = deferAsync {
157 init.unwrapped.await().mapValuesParallel { (k, stateful) ->
158 val newEnd = with(frpScope) { eventsByKey[k].skipNext() }
159 val newScope = childStateScope(newEnd)
160 newScope.runInStateScope(stateful)
161 }
162 }
163 val changesNode: TFlowImpl<Map<K, Maybe<A>>> =
164 mapImpl(
165 upstream = { this@applyLatestStatefulForKeyInternal.init.connect(evalScope = this) }
166 ) { upstreamMap ->
167 upstreamMap.mapValuesParallel { (k: K, ma: Maybe<FrpStateful<A>>) ->
168 reenterStateScope(this@StateScopeImpl).run {
169 ma.map { stateful ->
170 val newEnd = with(frpScope) { eventsByKey[k].skipNext() }
171 val newScope = childStateScope(newEnd)
172 newScope.runInStateScope(stateful)
173 }
174 }
175 }
176 }
177 val operatorName = "applyLatestStatefulForKey"
178 val name = operatorName
179 val changes: TFlow<Map<K, Maybe<A>>> = TFlowInit(constInit(name, changesNode.cached()))
180 return changes to FrpDeferredValue(initOut)
181 }
182
183 private fun <A> TFlow<FrpStateful<A>>.observeStatefulsInternal(): TFlow<A> {
184 val operatorName = "observeStatefuls"
185 val name = operatorName
186 return TFlowInit(
187 constInit(
188 name,
189 mapImpl(
190 upstream = { this@observeStatefulsInternal.init.connect(evalScope = this) }
191 ) { stateful ->
192 reenterStateScope(outerScope = this@StateScopeImpl)
193 .runInStateScope(stateful)
194 }
195 .cached(),
196 )
197 )
198 }
199
200 override val frpScope: FrpStateScope = FrpStateScopeImpl()
201
202 private inner class FrpStateScopeImpl :
203 FrpStateScope, FrpTransactionScope by evalScope.frpScope {
204
205 override fun <A> deferredStateScope(
206 block: suspend FrpStateScope.() -> A
207 ): FrpDeferredValue<A> = deferredInternal(block)
208
209 override fun <A> TFlow<A>.holdDeferred(initialValue: FrpDeferredValue<A>): TState<A> =
210 toTStateDeferredInternal(initialValue)
211
212 override fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
213 initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
214 ): TFlow<Map<K, V>> {
215 val storage: TState<Map<K, TFlow<V>>> = foldMapIncrementally(initialTFlows)
216 return mergeIncrementallyInternal(storage)
217 }
218
219 override fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
220 initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
221 ): TFlow<Map<K, V>> {
222 val storage: TState<Map<K, TFlow<V>>> = foldMapIncrementally(initialTFlows)
223 return mergeIncrementallyPromptInternal(storage)
224 }
225
226 override fun <K, A, B> TFlow<Map<K, Maybe<FrpStateful<A>>>>.applyLatestStatefulForKey(
227 init: FrpDeferredValue<Map<K, FrpStateful<B>>>,
228 numKeys: Int?,
229 ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> =
230 applyLatestStatefulForKeyInternal(init, numKeys)
231
232 override fun <A> TFlow<FrpStateful<A>>.applyStatefuls(): TFlow<A> =
233 observeStatefulsInternal()
234 }
235
236 override suspend fun <R> runInStateScope(block: suspend FrpStateScope.() -> R): R {
237 val complete = CompletableDeferred<R>(parent = coroutineContext.job)
238 block.startCoroutine(
239 frpScope,
240 object : Continuation<R> {
241 override val context: CoroutineContext
242 get() = EmptyCoroutineContext
243
244 override fun resumeWith(result: Result<R>) {
245 complete.completeWith(result)
246 }
247 },
248 )
249 return complete.await()
250 }
251
252 override fun childStateScope(newEnd: TFlow<Any>) =
253 StateScopeImpl(evalScope, merge(newEnd, endSignal))
254 }
255
reenterStateScopenull256 private fun EvalScope.reenterStateScope(outerScope: StateScopeImpl) =
257 StateScopeImpl(evalScope = this, endSignal = outerScope.endSignal)
258