1 /*
<lambda>null2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.systemui.kairos.internal
18 
19 import com.android.systemui.kairos.FrpDeferredValue
20 import com.android.systemui.kairos.FrpStateScope
21 import com.android.systemui.kairos.FrpStateful
22 import com.android.systemui.kairos.FrpTransactionScope
23 import com.android.systemui.kairos.GroupedTFlow
24 import com.android.systemui.kairos.TFlow
25 import com.android.systemui.kairos.TFlowInit
26 import com.android.systemui.kairos.TFlowLoop
27 import com.android.systemui.kairos.TState
28 import com.android.systemui.kairos.TStateInit
29 import com.android.systemui.kairos.emptyTFlow
30 import com.android.systemui.kairos.groupByKey
31 import com.android.systemui.kairos.init
32 import com.android.systemui.kairos.internal.util.mapValuesParallel
33 import com.android.systemui.kairos.mapCheap
34 import com.android.systemui.kairos.merge
35 import com.android.systemui.kairos.switch
36 import com.android.systemui.kairos.util.Maybe
37 import com.android.systemui.kairos.util.map
38 import kotlin.coroutines.Continuation
39 import kotlin.coroutines.CoroutineContext
40 import kotlin.coroutines.EmptyCoroutineContext
41 import kotlin.coroutines.startCoroutine
42 import kotlinx.coroutines.CompletableDeferred
43 import kotlinx.coroutines.Deferred
44 import kotlinx.coroutines.completeWith
45 import kotlinx.coroutines.job
46 
47 internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: TFlow<Any>) :
48     StateScope, EvalScope by evalScope {
49 
50     private val endSignalOnce: TFlow<Any> = endSignal.nextOnlyInternal("StateScope.endSignal")
51 
52     private fun <A> TFlow<A>.truncateToScope(operatorName: String): TFlow<A> =
53         if (endSignalOnce === emptyTFlow) {
54             this
55         } else {
56             endSignalOnce.mapCheap { emptyTFlow }.toTStateInternal(operatorName, this).switch()
57         }
58 
59     private fun <A> TFlow<A>.nextOnlyInternal(operatorName: String): TFlow<A> =
60         if (this === emptyTFlow) {
61             this
62         } else {
63             TFlowLoop<A>().apply {
64                 loopback =
65                     mapCheap { emptyTFlow }
66                         .toTStateInternal(operatorName, this@nextOnlyInternal)
67                         .switch()
68             }
69         }
70 
71     private fun <A> TFlow<A>.toTStateInternal(operatorName: String, init: A): TState<A> =
72         toTStateInternalDeferred(operatorName, CompletableDeferred(init))
73 
74     private fun <A> TFlow<A>.toTStateInternalDeferred(
75         operatorName: String,
76         init: Deferred<A>,
77     ): TState<A> {
78         val changes = this@toTStateInternalDeferred
79         val name = operatorName
80         val impl =
81             mkState(name, operatorName, evalScope, { changes.init.connect(evalScope = this) }, init)
82         return TStateInit(constInit(name, impl))
83     }
84 
85     private fun <R> deferredInternal(block: suspend FrpStateScope.() -> R): FrpDeferredValue<R> =
86         FrpDeferredValue(deferAsync { runInStateScope(block) })
87 
88     private fun <A> TFlow<A>.toTStateDeferredInternal(
89         initialValue: FrpDeferredValue<A>
90     ): TState<A> {
91         val operatorName = "toTStateDeferred"
92         // Ensure state is only collected until the end of this scope
93         return truncateToScope(operatorName)
94             .toTStateInternalDeferred(operatorName, initialValue.unwrapped)
95     }
96 
97     private fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyInternal(
98         storage: TState<Map<K, TFlow<V>>>
99     ): TFlow<Map<K, V>> {
100         val name = "mergeIncrementally"
101         return TFlowInit(
102             constInit(
103                 name,
104                 switchDeferredImpl(
105                     getStorage = {
106                         storage.init
107                             .connect(this)
108                             .getCurrentWithEpoch(this)
109                             .first
110                             .mapValuesParallel { (_, flow) -> flow.init.connect(this) }
111                     },
112                     getPatches = {
113                         mapImpl({ init.connect(this) }) { patch ->
114                             patch.mapValuesParallel { (_, m) ->
115                                 m.map { flow -> flow.init.connect(this) }
116                             }
117                         }
118                     },
119                 ),
120             )
121         )
122     }
123 
124     private fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptInternal(
125         storage: TState<Map<K, TFlow<V>>>
126     ): TFlow<Map<K, V>> {
127         val name = "mergeIncrementallyPrompt"
128         return TFlowInit(
129             constInit(
130                 name,
131                 switchPromptImpl(
132                     getStorage = {
133                         storage.init
134                             .connect(this)
135                             .getCurrentWithEpoch(this)
136                             .first
137                             .mapValuesParallel { (_, flow) -> flow.init.connect(this) }
138                     },
139                     getPatches = {
140                         mapImpl({ init.connect(this) }) { patch ->
141                             patch.mapValuesParallel { (_, m) ->
142                                 m.map { flow -> flow.init.connect(this) }
143                             }
144                         }
145                     },
146                 ),
147             )
148         )
149     }
150 
151     private fun <K, A, B> TFlow<Map<K, Maybe<FrpStateful<A>>>>.applyLatestStatefulForKeyInternal(
152         init: FrpDeferredValue<Map<K, FrpStateful<B>>>,
153         numKeys: Int?,
154     ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> {
155         val eventsByKey: GroupedTFlow<K, Maybe<FrpStateful<A>>> = groupByKey(numKeys)
156         val initOut: Deferred<Map<K, B>> = deferAsync {
157             init.unwrapped.await().mapValuesParallel { (k, stateful) ->
158                 val newEnd = with(frpScope) { eventsByKey[k].skipNext() }
159                 val newScope = childStateScope(newEnd)
160                 newScope.runInStateScope(stateful)
161             }
162         }
163         val changesNode: TFlowImpl<Map<K, Maybe<A>>> =
164             mapImpl(
165                 upstream = { this@applyLatestStatefulForKeyInternal.init.connect(evalScope = this) }
166             ) { upstreamMap ->
167                 upstreamMap.mapValuesParallel { (k: K, ma: Maybe<FrpStateful<A>>) ->
168                     reenterStateScope(this@StateScopeImpl).run {
169                         ma.map { stateful ->
170                             val newEnd = with(frpScope) { eventsByKey[k].skipNext() }
171                             val newScope = childStateScope(newEnd)
172                             newScope.runInStateScope(stateful)
173                         }
174                     }
175                 }
176             }
177         val operatorName = "applyLatestStatefulForKey"
178         val name = operatorName
179         val changes: TFlow<Map<K, Maybe<A>>> = TFlowInit(constInit(name, changesNode.cached()))
180         return changes to FrpDeferredValue(initOut)
181     }
182 
183     private fun <A> TFlow<FrpStateful<A>>.observeStatefulsInternal(): TFlow<A> {
184         val operatorName = "observeStatefuls"
185         val name = operatorName
186         return TFlowInit(
187             constInit(
188                 name,
189                 mapImpl(
190                         upstream = { this@observeStatefulsInternal.init.connect(evalScope = this) }
191                     ) { stateful ->
192                         reenterStateScope(outerScope = this@StateScopeImpl)
193                             .runInStateScope(stateful)
194                     }
195                     .cached(),
196             )
197         )
198     }
199 
200     override val frpScope: FrpStateScope = FrpStateScopeImpl()
201 
202     private inner class FrpStateScopeImpl :
203         FrpStateScope, FrpTransactionScope by evalScope.frpScope {
204 
205         override fun <A> deferredStateScope(
206             block: suspend FrpStateScope.() -> A
207         ): FrpDeferredValue<A> = deferredInternal(block)
208 
209         override fun <A> TFlow<A>.holdDeferred(initialValue: FrpDeferredValue<A>): TState<A> =
210             toTStateDeferredInternal(initialValue)
211 
212         override fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
213             initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
214         ): TFlow<Map<K, V>> {
215             val storage: TState<Map<K, TFlow<V>>> = foldMapIncrementally(initialTFlows)
216             return mergeIncrementallyInternal(storage)
217         }
218 
219         override fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
220             initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
221         ): TFlow<Map<K, V>> {
222             val storage: TState<Map<K, TFlow<V>>> = foldMapIncrementally(initialTFlows)
223             return mergeIncrementallyPromptInternal(storage)
224         }
225 
226         override fun <K, A, B> TFlow<Map<K, Maybe<FrpStateful<A>>>>.applyLatestStatefulForKey(
227             init: FrpDeferredValue<Map<K, FrpStateful<B>>>,
228             numKeys: Int?,
229         ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> =
230             applyLatestStatefulForKeyInternal(init, numKeys)
231 
232         override fun <A> TFlow<FrpStateful<A>>.applyStatefuls(): TFlow<A> =
233             observeStatefulsInternal()
234     }
235 
236     override suspend fun <R> runInStateScope(block: suspend FrpStateScope.() -> R): R {
237         val complete = CompletableDeferred<R>(parent = coroutineContext.job)
238         block.startCoroutine(
239             frpScope,
240             object : Continuation<R> {
241                 override val context: CoroutineContext
242                     get() = EmptyCoroutineContext
243 
244                 override fun resumeWith(result: Result<R>) {
245                     complete.completeWith(result)
246                 }
247             },
248         )
249         return complete.await()
250     }
251 
252     override fun childStateScope(newEnd: TFlow<Any>) =
253         StateScopeImpl(evalScope, merge(newEnd, endSignal))
254 }
255 
reenterStateScopenull256 private fun EvalScope.reenterStateScope(outerScope: StateScopeImpl) =
257     StateScopeImpl(evalScope = this, endSignal = outerScope.endSignal)
258