1 /*
2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.test.tracing.coroutines
18 
19 import android.platform.test.annotations.EnableFlags
20 import com.android.app.tracing.coroutines.createCoroutineTracingContext
21 import com.android.app.tracing.coroutines.flow.collectTraced
22 import com.android.app.tracing.coroutines.flow.flowName
23 import com.android.app.tracing.coroutines.launchTraced
24 import com.android.systemui.Flags.FLAG_COROUTINE_TRACING
25 import java.util.concurrent.Executor
26 import kotlin.coroutines.CoroutineContext
27 import kotlinx.coroutines.CoroutineDispatcher
28 import kotlinx.coroutines.CoroutineScope
29 import kotlinx.coroutines.DelicateCoroutinesApi
30 import kotlinx.coroutines.ExperimentalCoroutinesApi
31 import kotlinx.coroutines.asExecutor
32 import kotlinx.coroutines.cancel
33 import kotlinx.coroutines.channels.awaitClose
34 import kotlinx.coroutines.delay
35 import kotlinx.coroutines.flow.Flow
36 import kotlinx.coroutines.flow.MutableStateFlow
37 import kotlinx.coroutines.flow.SharingStarted
38 import kotlinx.coroutines.flow.StateFlow
39 import kotlinx.coroutines.flow.callbackFlow
40 import kotlinx.coroutines.flow.combine
41 import kotlinx.coroutines.flow.distinctUntilChanged
42 import kotlinx.coroutines.flow.map
43 import kotlinx.coroutines.flow.onEach
44 import kotlinx.coroutines.flow.onStart
45 import kotlinx.coroutines.flow.stateIn
46 import kotlinx.coroutines.newSingleThreadContext
47 import org.junit.Test
48 
49 data class ExampleInfo(val a: Int, val b: Boolean, val c: String)
50 
51 interface ExampleStateTracker {
52     val info: ExampleInfo
53 
addCallbacknull54     fun addCallback(callback: Callback, executor: Executor)
55 
56     fun removeCallback(callback: Callback)
57 
58     interface Callback {
59         fun onInfoChanged(newInfo: ExampleInfo)
60     }
61 }
62 
63 interface ExampleRepository {
64     val currentInfo: Flow<ExampleInfo>
65     val otherState: StateFlow<Boolean>
66     val combinedState: StateFlow<Boolean> // true when otherState == true and current.b == true
67 }
68 
69 class ExampleStateTrackerImpl : ExampleStateTracker {
70     private var _info = ExampleInfo(0, false, "Initial")
71     override val info: ExampleInfo
72         get() = _info
73 
74     val callbacks = mutableListOf<Pair<ExampleStateTracker.Callback, Executor>>()
75 
addCallbacknull76     override fun addCallback(callback: ExampleStateTracker.Callback, executor: Executor) {
77         callbacks.add(Pair(callback, executor))
78     }
79 
removeCallbacknull80     override fun removeCallback(callback: ExampleStateTracker.Callback) {
81         callbacks.removeIf { it.first == callback }
82     }
83 
forceUpdatenull84     fun forceUpdate(a: Int, b: Boolean, c: String) {
85         _info = ExampleInfo(a, b, c)
86         callbacks.forEach { it.second.execute { it.first.onInfoChanged(_info) } }
87     }
88 }
89 
90 private class ExampleRepositoryImpl(
91     private val testBase: TestBase,
92     private val bgScope: CoroutineScope,
93     private val tracker: ExampleStateTrackerImpl,
94 ) : ExampleRepository {
95     @OptIn(ExperimentalStdlibApi::class)
96     override val currentInfo: StateFlow<ExampleInfo> =
<lambda>null97         callbackFlow {
98                 channel.trySend(tracker.info)
99                 val callback =
100                     object : ExampleStateTracker.Callback {
101                         override fun onInfoChanged(newInfo: ExampleInfo) {
102                             channel.trySend(newInfo)
103                         }
104                     }
105                 tracker.addCallback(
106                     callback,
107                     bgScope.coroutineContext[CoroutineDispatcher]!!.asExecutor(),
108                 )
109                 awaitClose { tracker.removeCallback(callback) }
110             }
<lambda>null111             .onEach { testBase.expect("bg:1^currentInfo") }
112             .flowName("currentInfo")
113             .stateIn(bgScope, SharingStarted.Eagerly, initialValue = tracker.info)
114 
115     override val otherState = MutableStateFlow(false)
116 
117     /** flow that emits true only when currentInfo.b == true and otherState == true */
118     override val combinedState: StateFlow<Boolean>
119         get() =
120             combine(currentInfo, otherState, ::Pair)
<lambda>null121                 .map { it.first.b && it.second }
122                 .distinctUntilChanged()
<lambda>null123                 .onEach { testBase.expect("bg:2^combinedState:1^:2^") }
<lambda>null124                 .onStart { emit(false) }
125                 .flowName("combinedState")
126                 .stateIn(
127                     scope = bgScope,
128                     started = SharingStarted.WhileSubscribed(),
129                     initialValue = false,
130                 )
131 }
132 
133 @OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class)
134 @EnableFlags(FLAG_COROUTINE_TRACING)
135 class CallbackFlowTracingTest : TestBase() {
136 
137     override val extraCoroutineContext: CoroutineContext
138         get() = createCoroutineTracingContext("main", includeParentNames = true, strictMode = true)
139 
140     @Test
callbackFlow1null141     fun callbackFlow1() {
142         val exampleTracker = ExampleStateTrackerImpl()
143         val bgScope =
144             CoroutineScope(
145                 createCoroutineTracingContext("bg", includeParentNames = true, strictMode = true) +
146                     newSingleThreadContext("bg-thread")
147             )
148         val repository = ExampleRepositoryImpl(this, bgScope, exampleTracker)
149 
150         expect(1)
151         runTest {
152             launchTraced("collectCombined") {
153                 repository.combinedState.collectTraced("combined-states") {
154                     expect(
155                         listOf(2, 4, 5, 6),
156                         "main:1^:1^collectCombined",
157                         "collect:combined-states",
158                         "collect:combined-states:emit",
159                     )
160                 }
161             }
162             delay(10)
163             expect(3, "main:1^")
164             delay(10)
165             exampleTracker.forceUpdate(1, false, "A") // <-- no change
166             delay(10)
167             repository.otherState.value = true // <-- no change
168             delay(10)
169             exampleTracker.forceUpdate(2, true, "B") // <-- should update `combinedState`
170             delay(10)
171             repository.otherState.value = false // <-- should update `combinedState`
172             delay(10)
173             exampleTracker.forceUpdate(3, false, "C") // <-- no change
174             delay(10)
175             exampleTracker.forceUpdate(4, true, "D") // <-- no change
176             delay(10)
177             repository.otherState.value = true // <-- should update `combinedState`
178             delay(10)
179             finish(7, "main:1^")
180             cancel("Cancelled normally for test")
181         }
182         bgScope.cancel("Cancelled normally for test")
183     }
184 }
185