xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)

<lambda>null1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3 @file:Suppress("UNCHECKED_CAST")
4 
5 package kotlinx.coroutines.flow
6 
7 import kotlinx.coroutines.flow.internal.*
8 import kotlin.jvm.*
9 import kotlinx.coroutines.flow.flow as safeFlow
10 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
11 
12 /**
13  * Returns a [Flow] whose values are generated with [transform] function by combining
14  * the most recently emitted values by each flow.
15  *
16  * It can be demonstrated with the following example:
17  * ```
18  * val flow = flowOf(1, 2).onEach { delay(10) }
19  * val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
20  * flow.combine(flow2) { i, s -> i.toString() + s }.collect {
21  *     println(it) // Will print "1a 2a 2b 2c"
22  * }
23  * ```
24  *
25  * This function is a shorthand for `flow.combineTransform(flow2) { a, b -> emit(transform(a, b)) }
26  */
27 @JvmName("flowCombine")
28 public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
29     combineInternal(arrayOf(this@combine, flow), nullArrayFactory(), { emit(transform(it[0] as T1, it[1] as T2)) })
30 }
31 
32 /**
33  * Returns a [Flow] whose values are generated with [transform] function by combining
34  * the most recently emitted values by each flow.
35  *
36  * It can be demonstrated with the following example:
37  * ```
38  * val flow = flowOf(1, 2).onEach { delay(10) }
39  * val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
40  * combine(flow, flow2) { i, s -> i.toString() + s }.collect {
41  *     println(it) // Will print "1a 2a 2b 2c"
42  * }
43  * ```
44  *
45  * This function is a shorthand for `combineTransform(flow, flow2) { a, b -> emit(transform(a, b)) }
46  */
combinenull47 public fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> =
48     flow.combine(flow2, transform)
49 
50 /**
51  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
52  *
53  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
54  * generic function that may transform emitted element, skip it or emit it multiple times.
55  *
56  * Its usage can be demonstrated with the following example:
57  * ```
58  * val flow = requestFlow()
59  * val flow2 = searchEngineFlow()
60  * flow.combineTransform(flow2) { request, searchEngine ->
61  *     emit("Downloading in progress")
62  *     val result = download(request, searchEngine)
63  *     emit(result)
64  * }
65  * ```
66  */
67 @JvmName("flowCombineTransform")
68 public fun <T1, T2, R> Flow<T1>.combineTransform(
69     flow: Flow<T2>,
70     @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
71 ): Flow<R> = combineTransformUnsafe(this, flow) { args: Array<*> ->
72     transform(
73         args[0] as T1,
74         args[1] as T2
75     )
76 }
77 
78 /**
79  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
80  *
81  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
82  * generic function that may transform emitted element, skip it or emit it multiple times.
83  *
84  * Its usage can be demonstrated with the following example:
85  * ```
86  * val flow = requestFlow()
87  * val flow2 = searchEngineFlow()
88  * combineTransform(flow, flow2) { request, searchEngine ->
89  *     emit("Downloading in progress")
90  *     val result = download(request, searchEngine)
91  *     emit(result)
92  * }
93  * ```
94  */
combineTransformnull95 public fun <T1, T2, R> combineTransform(
96     flow: Flow<T1>,
97     flow2: Flow<T2>,
98     @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
99 ): Flow<R> = combineTransformUnsafe(flow, flow2) { args: Array<*> ->
100     transform(
101         args[0] as T1,
102         args[1] as T2
103     )
104 }
105 
106 /**
107  * Returns a [Flow] whose values are generated with [transform] function by combining
108  * the most recently emitted values by each flow.
109  */
combinenull110 public fun <T1, T2, T3, R> combine(
111     flow: Flow<T1>,
112     flow2: Flow<T2>,
113     flow3: Flow<T3>,
114     @BuilderInference transform: suspend (T1, T2, T3) -> R
115 ): Flow<R> = combineUnsafe(flow, flow2, flow3) { args: Array<*> ->
116     transform(
117         args[0] as T1,
118         args[1] as T2,
119         args[2] as T3
120     )
121 }
122 
123 /**
124  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
125  *
126  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
127  * generic function that may transform emitted element, skip it or emit it multiple times.
128  */
combineTransformnull129 public fun <T1, T2, T3, R> combineTransform(
130     flow: Flow<T1>,
131     flow2: Flow<T2>,
132     flow3: Flow<T3>,
133     @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
134 ): Flow<R> = combineTransformUnsafe(flow, flow2, flow3) { args: Array<*> ->
135     transform(
136         args[0] as T1,
137         args[1] as T2,
138         args[2] as T3
139     )
140 }
141 
142 /**
143  * Returns a [Flow] whose values are generated with [transform] function by combining
144  * the most recently emitted values by each flow.
145  */
combinenull146 public fun <T1, T2, T3, T4, R> combine(
147     flow: Flow<T1>,
148     flow2: Flow<T2>,
149     flow3: Flow<T3>,
150     flow4: Flow<T4>,
151     transform: suspend (T1, T2, T3, T4) -> R
152 ): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
153     transform(
154         args[0] as T1,
155         args[1] as T2,
156         args[2] as T3,
157         args[3] as T4
158     )
159 }
160 
161 /**
162  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
163  *
164  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
165  * generic function that may transform emitted element, skip it or emit it multiple times.
166  */
combineTransformnull167 public fun <T1, T2, T3, T4, R> combineTransform(
168     flow: Flow<T1>,
169     flow2: Flow<T2>,
170     flow3: Flow<T3>,
171     flow4: Flow<T4>,
172     @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
173 ): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
174     transform(
175         args[0] as T1,
176         args[1] as T2,
177         args[2] as T3,
178         args[3] as T4
179     )
180 }
181 
182 /**
183  * Returns a [Flow] whose values are generated with [transform] function by combining
184  * the most recently emitted values by each flow.
185  */
combinenull186 public fun <T1, T2, T3, T4, T5, R> combine(
187     flow: Flow<T1>,
188     flow2: Flow<T2>,
189     flow3: Flow<T3>,
190     flow4: Flow<T4>,
191     flow5: Flow<T5>,
192     transform: suspend (T1, T2, T3, T4, T5) -> R
193 ): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
194     transform(
195         args[0] as T1,
196         args[1] as T2,
197         args[2] as T3,
198         args[3] as T4,
199         args[4] as T5
200     )
201 }
202 
203 /**
204  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
205  *
206  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
207  * generic function that may transform emitted element, skip it or emit it multiple times.
208  */
combineTransformnull209 public fun <T1, T2, T3, T4, T5, R> combineTransform(
210     flow: Flow<T1>,
211     flow2: Flow<T2>,
212     flow3: Flow<T3>,
213     flow4: Flow<T4>,
214     flow5: Flow<T5>,
215     @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
216 ): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
217     transform(
218         args[0] as T1,
219         args[1] as T2,
220         args[2] as T3,
221         args[3] as T4,
222         args[4] as T5
223     )
224 }
225 
226 /**
227  * Returns a [Flow] whose values are generated with [transform] function by combining
228  * the most recently emitted values by each flow.
229  */
combinenull230 public inline fun <reified T, R> combine(
231     vararg flows: Flow<T>,
232     crossinline transform: suspend (Array<T>) -> R
233 ): Flow<R> = flow {
234     combineInternal(flows, { arrayOfNulls(flows.size) }, { emit(transform(it)) })
235 }
236 
237 /**
238  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
239  *
240  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
241  * generic function that may transform emitted element, skip it or emit it multiple times.
242  */
combineTransformnull243 public inline fun <reified T, R> combineTransform(
244     vararg flows: Flow<T>,
245     @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
246 ): Flow<R> = safeFlow {
247     combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) })
248 }
249 
250 /*
251  * Same as combine, but does not copy array each time, deconstructing existing
252  * array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
253  */
combineUnsafenull254 private inline fun <reified T, R> combineUnsafe(
255     vararg flows: Flow<T>,
256     crossinline transform: suspend (Array<T>) -> R
257 ): Flow<R> = flow {
258     combineInternal(flows, nullArrayFactory(), { emit(transform(it)) })
259 }
260 
261 /*
262  * Same as combineTransform, but does not copy array each time, deconstructing existing
263  * array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
264  */
combineTransformUnsafenull265 private inline fun <reified T, R> combineTransformUnsafe(
266     vararg flows: Flow<T>,
267     @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
268 ): Flow<R> = safeFlow {
269     combineInternal(flows, nullArrayFactory(), { transform(it) })
270 }
271 
272 // Saves bunch of anonymous classes
<lambda>null273 private fun <T> nullArrayFactory(): () -> Array<T>? = { null }
274 
275 /**
276  * Returns a [Flow] whose values are generated with [transform] function by combining
277  * the most recently emitted values by each flow.
278  */
combinenull279 public inline fun <reified T, R> combine(
280     flows: Iterable<Flow<T>>,
281     crossinline transform: suspend (Array<T>) -> R
282 ): Flow<R> {
283     val flowArray = flows.toList().toTypedArray()
284     return flow {
285         combineInternal(
286             flowArray,
287             arrayFactory = { arrayOfNulls(flowArray.size) },
288             transform = { emit(transform(it)) })
289     }
290 }
291 
292 /**
293  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
294  *
295  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
296  * generic function that may transform emitted element, skip it or emit it multiple times.
297  */
combineTransformnull298 public inline fun <reified T, R> combineTransform(
299     flows: Iterable<Flow<T>>,
300     @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
301 ): Flow<R> {
302     val flowArray = flows.toList().toTypedArray()
303     return safeFlow {
304         combineInternal(flowArray, { arrayOfNulls(flowArray.size) }, { transform(it) })
305     }
306 }
307 
308 /**
309  * Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
310  * The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.
311  *
312  * It can be demonstrated with the following example:
313  * ```
314  * val flow = flowOf(1, 2, 3).onEach { delay(10) }
315  * val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
316  * flow.zip(flow2) { i, s -> i.toString() + s }.collect {
317  *     println(it) // Will print "1a 2b 3c"
318  * }
319  * ```
320  *
321  * ### Buffering
322  *
323  * The upstream flow is collected sequentially in the same coroutine without any buffering, while the
324  * [other] flow is collected concurrently as if `buffer(0)` is used. See documentation in the [buffer] operator
325  * for explanation. You can use additional calls to the [buffer] operator as needed for more concurrency.
326  */
zipnull327 public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
328