<lambda>null1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3 @file:Suppress("UNCHECKED_CAST")
4
5 package kotlinx.coroutines.flow
6
7 import kotlinx.coroutines.flow.internal.*
8 import kotlin.jvm.*
9 import kotlinx.coroutines.flow.flow as safeFlow
10 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
11
12 /**
13 * Returns a [Flow] whose values are generated with [transform] function by combining
14 * the most recently emitted values by each flow.
15 *
16 * It can be demonstrated with the following example:
17 * ```
18 * val flow = flowOf(1, 2).onEach { delay(10) }
19 * val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
20 * flow.combine(flow2) { i, s -> i.toString() + s }.collect {
21 * println(it) // Will print "1a 2a 2b 2c"
22 * }
23 * ```
24 *
25 * This function is a shorthand for `flow.combineTransform(flow2) { a, b -> emit(transform(a, b)) }
26 */
27 @JvmName("flowCombine")
28 public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
29 combineInternal(arrayOf(this@combine, flow), nullArrayFactory(), { emit(transform(it[0] as T1, it[1] as T2)) })
30 }
31
32 /**
33 * Returns a [Flow] whose values are generated with [transform] function by combining
34 * the most recently emitted values by each flow.
35 *
36 * It can be demonstrated with the following example:
37 * ```
38 * val flow = flowOf(1, 2).onEach { delay(10) }
39 * val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
40 * combine(flow, flow2) { i, s -> i.toString() + s }.collect {
41 * println(it) // Will print "1a 2a 2b 2c"
42 * }
43 * ```
44 *
45 * This function is a shorthand for `combineTransform(flow, flow2) { a, b -> emit(transform(a, b)) }
46 */
combinenull47 public fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> =
48 flow.combine(flow2, transform)
49
50 /**
51 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
52 *
53 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
54 * generic function that may transform emitted element, skip it or emit it multiple times.
55 *
56 * Its usage can be demonstrated with the following example:
57 * ```
58 * val flow = requestFlow()
59 * val flow2 = searchEngineFlow()
60 * flow.combineTransform(flow2) { request, searchEngine ->
61 * emit("Downloading in progress")
62 * val result = download(request, searchEngine)
63 * emit(result)
64 * }
65 * ```
66 */
67 @JvmName("flowCombineTransform")
68 public fun <T1, T2, R> Flow<T1>.combineTransform(
69 flow: Flow<T2>,
70 @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
71 ): Flow<R> = combineTransformUnsafe(this, flow) { args: Array<*> ->
72 transform(
73 args[0] as T1,
74 args[1] as T2
75 )
76 }
77
78 /**
79 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
80 *
81 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
82 * generic function that may transform emitted element, skip it or emit it multiple times.
83 *
84 * Its usage can be demonstrated with the following example:
85 * ```
86 * val flow = requestFlow()
87 * val flow2 = searchEngineFlow()
88 * combineTransform(flow, flow2) { request, searchEngine ->
89 * emit("Downloading in progress")
90 * val result = download(request, searchEngine)
91 * emit(result)
92 * }
93 * ```
94 */
combineTransformnull95 public fun <T1, T2, R> combineTransform(
96 flow: Flow<T1>,
97 flow2: Flow<T2>,
98 @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
99 ): Flow<R> = combineTransformUnsafe(flow, flow2) { args: Array<*> ->
100 transform(
101 args[0] as T1,
102 args[1] as T2
103 )
104 }
105
106 /**
107 * Returns a [Flow] whose values are generated with [transform] function by combining
108 * the most recently emitted values by each flow.
109 */
combinenull110 public fun <T1, T2, T3, R> combine(
111 flow: Flow<T1>,
112 flow2: Flow<T2>,
113 flow3: Flow<T3>,
114 @BuilderInference transform: suspend (T1, T2, T3) -> R
115 ): Flow<R> = combineUnsafe(flow, flow2, flow3) { args: Array<*> ->
116 transform(
117 args[0] as T1,
118 args[1] as T2,
119 args[2] as T3
120 )
121 }
122
123 /**
124 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
125 *
126 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
127 * generic function that may transform emitted element, skip it or emit it multiple times.
128 */
combineTransformnull129 public fun <T1, T2, T3, R> combineTransform(
130 flow: Flow<T1>,
131 flow2: Flow<T2>,
132 flow3: Flow<T3>,
133 @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
134 ): Flow<R> = combineTransformUnsafe(flow, flow2, flow3) { args: Array<*> ->
135 transform(
136 args[0] as T1,
137 args[1] as T2,
138 args[2] as T3
139 )
140 }
141
142 /**
143 * Returns a [Flow] whose values are generated with [transform] function by combining
144 * the most recently emitted values by each flow.
145 */
combinenull146 public fun <T1, T2, T3, T4, R> combine(
147 flow: Flow<T1>,
148 flow2: Flow<T2>,
149 flow3: Flow<T3>,
150 flow4: Flow<T4>,
151 transform: suspend (T1, T2, T3, T4) -> R
152 ): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
153 transform(
154 args[0] as T1,
155 args[1] as T2,
156 args[2] as T3,
157 args[3] as T4
158 )
159 }
160
161 /**
162 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
163 *
164 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
165 * generic function that may transform emitted element, skip it or emit it multiple times.
166 */
combineTransformnull167 public fun <T1, T2, T3, T4, R> combineTransform(
168 flow: Flow<T1>,
169 flow2: Flow<T2>,
170 flow3: Flow<T3>,
171 flow4: Flow<T4>,
172 @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
173 ): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
174 transform(
175 args[0] as T1,
176 args[1] as T2,
177 args[2] as T3,
178 args[3] as T4
179 )
180 }
181
182 /**
183 * Returns a [Flow] whose values are generated with [transform] function by combining
184 * the most recently emitted values by each flow.
185 */
combinenull186 public fun <T1, T2, T3, T4, T5, R> combine(
187 flow: Flow<T1>,
188 flow2: Flow<T2>,
189 flow3: Flow<T3>,
190 flow4: Flow<T4>,
191 flow5: Flow<T5>,
192 transform: suspend (T1, T2, T3, T4, T5) -> R
193 ): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
194 transform(
195 args[0] as T1,
196 args[1] as T2,
197 args[2] as T3,
198 args[3] as T4,
199 args[4] as T5
200 )
201 }
202
203 /**
204 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
205 *
206 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
207 * generic function that may transform emitted element, skip it or emit it multiple times.
208 */
combineTransformnull209 public fun <T1, T2, T3, T4, T5, R> combineTransform(
210 flow: Flow<T1>,
211 flow2: Flow<T2>,
212 flow3: Flow<T3>,
213 flow4: Flow<T4>,
214 flow5: Flow<T5>,
215 @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
216 ): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
217 transform(
218 args[0] as T1,
219 args[1] as T2,
220 args[2] as T3,
221 args[3] as T4,
222 args[4] as T5
223 )
224 }
225
226 /**
227 * Returns a [Flow] whose values are generated with [transform] function by combining
228 * the most recently emitted values by each flow.
229 */
combinenull230 public inline fun <reified T, R> combine(
231 vararg flows: Flow<T>,
232 crossinline transform: suspend (Array<T>) -> R
233 ): Flow<R> = flow {
234 combineInternal(flows, { arrayOfNulls(flows.size) }, { emit(transform(it)) })
235 }
236
237 /**
238 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
239 *
240 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
241 * generic function that may transform emitted element, skip it or emit it multiple times.
242 */
combineTransformnull243 public inline fun <reified T, R> combineTransform(
244 vararg flows: Flow<T>,
245 @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
246 ): Flow<R> = safeFlow {
247 combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) })
248 }
249
250 /*
251 * Same as combine, but does not copy array each time, deconstructing existing
252 * array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
253 */
combineUnsafenull254 private inline fun <reified T, R> combineUnsafe(
255 vararg flows: Flow<T>,
256 crossinline transform: suspend (Array<T>) -> R
257 ): Flow<R> = flow {
258 combineInternal(flows, nullArrayFactory(), { emit(transform(it)) })
259 }
260
261 /*
262 * Same as combineTransform, but does not copy array each time, deconstructing existing
263 * array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
264 */
combineTransformUnsafenull265 private inline fun <reified T, R> combineTransformUnsafe(
266 vararg flows: Flow<T>,
267 @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
268 ): Flow<R> = safeFlow {
269 combineInternal(flows, nullArrayFactory(), { transform(it) })
270 }
271
272 // Saves bunch of anonymous classes
<lambda>null273 private fun <T> nullArrayFactory(): () -> Array<T>? = { null }
274
275 /**
276 * Returns a [Flow] whose values are generated with [transform] function by combining
277 * the most recently emitted values by each flow.
278 */
combinenull279 public inline fun <reified T, R> combine(
280 flows: Iterable<Flow<T>>,
281 crossinline transform: suspend (Array<T>) -> R
282 ): Flow<R> {
283 val flowArray = flows.toList().toTypedArray()
284 return flow {
285 combineInternal(
286 flowArray,
287 arrayFactory = { arrayOfNulls(flowArray.size) },
288 transform = { emit(transform(it)) })
289 }
290 }
291
292 /**
293 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
294 *
295 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
296 * generic function that may transform emitted element, skip it or emit it multiple times.
297 */
combineTransformnull298 public inline fun <reified T, R> combineTransform(
299 flows: Iterable<Flow<T>>,
300 @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
301 ): Flow<R> {
302 val flowArray = flows.toList().toTypedArray()
303 return safeFlow {
304 combineInternal(flowArray, { arrayOfNulls(flowArray.size) }, { transform(it) })
305 }
306 }
307
308 /**
309 * Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
310 * The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.
311 *
312 * It can be demonstrated with the following example:
313 * ```
314 * val flow = flowOf(1, 2, 3).onEach { delay(10) }
315 * val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
316 * flow.zip(flow2) { i, s -> i.toString() + s }.collect {
317 * println(it) // Will print "1a 2b 3c"
318 * }
319 * ```
320 *
321 * ### Buffering
322 *
323 * The upstream flow is collected sequentially in the same coroutine without any buffering, while the
324 * [other] flow is collected concurrently as if `buffer(0)` is used. See documentation in the [buffer] operator
325 * for explanation. You can use additional calls to the [buffer] operator as needed for more concurrency.
326 */
zipnull327 public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
328