1 /*
2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 @file:OptIn(ExperimentalTypeInference::class)
18 
19 package com.android.systemui.utils.coroutines.flow
20 
21 import kotlin.experimental.ExperimentalTypeInference
22 import kotlinx.coroutines.channels.ProducerScope
23 import kotlinx.coroutines.channels.SendChannel
24 import kotlinx.coroutines.channels.awaitClose
25 import kotlinx.coroutines.flow.Flow
26 import kotlinx.coroutines.flow.buffer
27 import kotlinx.coroutines.flow.callbackFlow
28 import kotlinx.coroutines.flow.channelFlow
29 import kotlinx.coroutines.flow.conflate
30 import kotlinx.coroutines.flow.flowOn
31 import kotlinx.coroutines.flow.produceIn
32 import kotlinx.coroutines.suspendCancellableCoroutine
33 
34 /**
35  * Creates an instance of a _cold_ [Flow] with elements that are sent to a [SendChannel] provided to
36  * the builder's [block] of code via [ProducerScope]. It allows elements to be produced by code that
37  * is running in a different context or concurrently.
38  *
39  * The resulting flow is _cold_, which means that [block] is called every time a terminal operator
40  * is applied to the resulting flow.
41  *
42  * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope]
43  * can be used from any context, e.g. from a callback-based API. The resulting flow completes as
44  * soon as the code in the [block] completes. [awaitClose] should be used to keep the flow running,
45  * otherwise the channel will be closed immediately when block completes. [awaitClose] argument is
46  * called either when a flow consumer cancels the flow collection or when a callback-based API
47  * invokes [SendChannel.close] manually and is typically used to cleanup the resources after the
48  * completion, e.g. unregister a callback. Using [awaitClose] is mandatory in order to prevent
49  * memory leaks when the flow collection is cancelled, otherwise the callback may keep running even
50  * when the flow collector is already completed. To avoid such leaks, this method throws
51  * [IllegalStateException] if block returns, but the channel is not closed yet.
52  *
53  * A [conflated][conflate] channel is used. Use the [buffer] operator on the resulting flow to
54  * specify a user-defined value and to control what happens when data is produced faster than
55  * consumed, i.e. to control the back-pressure behavior.
56  *
57  * Adjacent applications of [callbackFlow], [flowOn], [buffer], and [produceIn] are always fused so
58  * that only one properly configured channel is used for execution.
59  *
60  * Example of usage that converts a multi-shot callback API to a flow. For single-shot callbacks use
61  * [suspendCancellableCoroutine].
62  *
63  * ```
64  * fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
65  *     val callback = object : Callback { // Implementation of some callback interface
66  *         override fun onNextValue(value: T) {
67  *             // To avoid blocking you can configure channel capacity using
68  *             // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
69  *             trySendBlocking(value)
70  *                 .onFailure { throwable ->
71  *                     // Downstream has been cancelled or failed, can log here
72  *                 }
73  *         }
74  *         override fun onApiError(cause: Throwable) {
75  *             cancel(CancellationException("API Error", cause))
76  *         }
77  *         override fun onCompleted() = channel.close()
78  *     }
79  *     api.register(callback)
80  *     /*
81  *      * Suspends until either 'onCompleted'/'onApiError' from the callback is invoked
82  *      * or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
83  *      * In both cases, callback will be properly unregistered.
84  *      */
85  *     awaitClose { api.unregister(callback) }
86  * }
87  * ```
88  * > The callback `register`/`unregister` methods provided by an external API must be thread-safe,
89  * > because `awaitClose` block can be called at any time due to asynchronous nature of
90  * > cancellation, even concurrently with the call of the callback.
91  *
92  * This builder is to be preferred over [callbackFlow], due to the latter's default configuration of
93  * using an internal buffer, negatively impacting system health.
94  *
95  * @see callbackFlow
96  */
conflatedCallbackFlownull97 fun <T> conflatedCallbackFlow(
98     @BuilderInference block: suspend ProducerScope<T>.() -> Unit,
99 ): Flow<T> = callbackFlow(block).conflate()
100 
101 /**
102  * Creates an instance of a _cold_ [Flow] with elements that are sent to a [SendChannel] provided to
103  * the builder's [block] of code via [ProducerScope]. It allows elements to be produced by code that
104  * is running in a different context or concurrently. The resulting flow is _cold_, which means that
105  * [block] is called every time a terminal operator is applied to the resulting flow.
106  *
107  * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope]
108  * can be used concurrently from different contexts. The resulting flow completes as soon as the
109  * code in the [block] and all its children completes. Use [awaitClose] as the last statement to
110  * keep it running. A more detailed example is provided in the documentation of [callbackFlow].
111  *
112  * A [conflated][conflate] channel is used. Use the [buffer] operator on the resulting flow to
113  * specify a user-defined value and to control what happens when data is produced faster than
114  * consumed, i.e. to control the back-pressure behavior.
115  *
116  * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are always fused so
117  * that only one properly configured channel is used for execution.
118  *
119  * Examples of usage:
120  * ```
121  * fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
122  *     // collect from one coroutine and send it
123  *     launch {
124  *         collect { send(it) }
125  *     }
126  *     // collect and send from this coroutine, too, concurrently
127  *     other.collect { send(it) }
128  * }
129  *
130  * fun <T> contextualFlow(): Flow<T> = channelFlow {
131  *     // send from one coroutine
132  *     launch(Dispatchers.IO) {
133  *         send(computeIoValue())
134  *     }
135  *     // send from another coroutine, concurrently
136  *     launch(Dispatchers.Default) {
137  *         send(computeCpuValue())
138  *     }
139  * }
140  * ```
141  *
142  * This builder is to be preferred over [channelFlow], due to the latter's default configuration of
143  * using an internal buffer, negatively impacting system health.
144  *
145  * @see channelFlow
146  */
147 fun <T> conflatedChannelFlow(
148     @BuilderInference block: suspend ProducerScope<T>.() -> Unit,
149 ): Flow<T> = channelFlow(block).conflate()
150