1 @file:Suppress("FunctionName", "DEPRECATION")
2 
3 package kotlinx.coroutines.channels
4 
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.channels.BufferOverflow.*
7 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
8 import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
9 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
10 import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.selects.*
13 import kotlin.native.concurrent.*
14 
15 /**
16  * Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
17  * that subscribe for the elements using [openSubscription] function and unsubscribe using [ReceiveChannel.cancel]
18  * function.
19  *
20  * See `BroadcastChannel()` factory function for the description of available
21  * broadcast channel implementations.
22  *
23  * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
24  * It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
25  */
26 @ObsoleteCoroutinesApi
27 @Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
28 public interface BroadcastChannel<E> : SendChannel<E> {
29     /**
30      * Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
31      * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this
32      * broadcast channel.
33      */
openSubscriptionnull34     public fun openSubscription(): ReceiveChannel<E>
35 
36     /**
37      * Cancels reception of remaining elements from this channel with an optional cause.
38      * This function closes the channel with
39      * the specified cause (unless it was already closed), removes all buffered sent elements from it,
40      * and [cancels][ReceiveChannel.cancel] all open subscriptions.
41      * A cause can be used to specify an error message or to provide other details on
42      * a cancellation reason for debugging purposes.
43      */
44     public fun cancel(cause: CancellationException? = null)
45 
46     /**
47      * @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel].
48      */
49     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
50     public fun cancel(cause: Throwable? = null): Boolean
51 }
52 
53 /**
54  * Creates a broadcast channel with the specified buffer capacity.
55  *
56  * The resulting channel type depends on the specified [capacity] parameter:
57  *
58  * - when `capacity` positive, but less than [UNLIMITED] -- creates `ArrayBroadcastChannel` with a buffer of given capacity.
59  *   **Note:** this channel looses all items that have been sent to it until the first subscriber appears;
60  * - when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
61  * - when `capacity` is [BUFFERED] -- creates `ArrayBroadcastChannel` with a default capacity.
62  * - otherwise -- throws [IllegalArgumentException].
63  *
64  * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
65  * It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow] and [StateFlow][kotlinx.coroutines.flow.StateFlow].
66  */
67 @ObsoleteCoroutinesApi
68 @Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and StateFlow, and is no longer supported")
69 public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
70     when (capacity) {
71         0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
72         UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
73         CONFLATED -> ConflatedBroadcastChannel()
74         BUFFERED -> BroadcastChannelImpl(CHANNEL_DEFAULT_CAPACITY)
75         else -> BroadcastChannelImpl(capacity)
76     }
77 
78 /**
79  * Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
80  *
81  * Back-to-send sent elements are _conflated_ -- only the most recently sent value is received,
82  * while previously sent elements **are lost**.
83  * Every subscriber immediately receives the most recently sent element.
84  * Sender to this broadcast channel never suspends and [trySend] always succeeds.
85  *
86  * A secondary constructor can be used to create an instance of this class that already holds a value.
87  * This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
88  *
89  * In this implementation, [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription
90  * takes linear time in the number of subscribers.
91  *
92  * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
93  * It is replaced with [SharedFlow][kotlinx.coroutines.flow.StateFlow].
94  */
95 @ObsoleteCoroutinesApi
96 @Deprecated(level = DeprecationLevel.WARNING, message = "ConflatedBroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
97 public class ConflatedBroadcastChannel<E> private constructor(
98     private val broadcast: BroadcastChannelImpl<E>
99 ) : BroadcastChannel<E> by broadcast {
100     public constructor(): this(BroadcastChannelImpl<E>(capacity = CONFLATED))
101     /**
102      * Creates an instance of this class that already holds a value.
103      *
104      * It is as a shortcut to creating an instance with a default constructor and
105      * immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
106      */
107     public constructor(value: E) : this() {
108         trySend(value)
109     }
110 
111     /**
112      * The most recently sent element to this channel.
113      *
114      * Access to this property throws [IllegalStateException] when this class is constructed without
115      * initial value and no value was sent yet or if it was [closed][close] without a cause.
116      * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
117      */
118     public val value: E get() = broadcast.value
119     /**
120      * The most recently sent element to this channel or `null` when this class is constructed without
121      * initial value and no value was sent yet or if it was [closed][close].
122      */
123     public val valueOrNull: E? get() = broadcast.valueOrNull
124 }
125 
126 /**
127  * A common implementation for both the broadcast channel with a buffer of fixed [capacity]
128  * and the conflated broadcast channel (see [ConflatedBroadcastChannel]).
129  *
130  * **Note**, that elements that are sent to this channel while there are no
131  * [openSubscription] subscribers are immediately lost.
132  *
133  * This channel is created by `BroadcastChannel(capacity)` factory function invocation.
134  */
135 @Suppress("MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_DEPRECATION_WARNING", "MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_WHEN_NO_EXPLICIT_OVERRIDE_DEPRECATION_WARNING") // do not remove the MULTIPLE_DEFAULTS suppression: required in K2
136 internal class BroadcastChannelImpl<E>(
137     /**
138      * Buffer capacity; [Channel.CONFLATED] when this broadcast is conflated.
139      */
140     val capacity: Int
141 ) : BufferedChannel<E>(capacity = Channel.RENDEZVOUS, onUndeliveredElement = null), BroadcastChannel<E> {
142     init {
<lambda>null143         require(capacity >= 1 || capacity == CONFLATED) {
144             "BroadcastChannel capacity must be positive or Channel.CONFLATED, but $capacity was specified"
145         }
146     }
147 
148     // This implementation uses coarse-grained synchronization,
149     // as, reputedly, it is the simplest synchronization scheme.
150     // All operations are protected by this lock.
151     private val lock = ReentrantLock()
152     // The list of subscribers; all accesses should be protected by lock.
153     // Each change must create a new list instance to avoid `ConcurrentModificationException`.
154     private var subscribers: List<BufferedChannel<E>> = emptyList()
155     // When this broadcast is conflated, this field stores the last sent element.
156     // If this channel is empty or not conflated, it stores a special `NO_ELEMENT` marker.
157     private var lastConflatedElement: Any? = NO_ELEMENT // NO_ELEMENT or E
158 
159     // ###########################
160     // # Subscription Management #
161     // ###########################
162 
<lambda>null163     override fun openSubscription(): ReceiveChannel<E> = lock.withLock { // protected by lock
164         // Is this broadcast conflated or buffered?
165         // Create the corresponding subscription channel.
166         val s = if (capacity == CONFLATED) SubscriberConflated() else SubscriberBuffered()
167         // If this broadcast is already closed or cancelled,
168         // and the last sent element is not available in case
169         // this broadcast is conflated, close the created
170         // subscriber immediately and return it.
171         if (isClosedForSend && lastConflatedElement === NO_ELEMENT) {
172             s.close(closeCause)
173             return s
174         }
175         // Is this broadcast conflated? If so, send
176         // the last sent element to the subscriber.
177         if (lastConflatedElement !== NO_ELEMENT) {
178             s.trySend(value)
179         }
180         // Add the subscriber to the list and return it.
181         subscribers += s
182         s
183     }
184 
<lambda>null185     private fun removeSubscriber(s: ReceiveChannel<E>) = lock.withLock { // protected by lock
186         subscribers = subscribers.filter { it !== s }
187     }
188 
189     // #############################
190     // # The `send(..)` Operations #
191     // #############################
192 
193     /**
194      * Sends the specified element to all subscribers.
195      *
196      * **!!! THIS IMPLEMENTATION IS NOT LINEARIZABLE !!!**
197      *
198      * As the operation should send the element to multiple
199      * subscribers simultaneously, it is non-trivial to
200      * implement it in an atomic way. Specifically, this
201      * would require a special implementation that does
202      * not transfer the element until all parties are able
203      * to resume it (this `send(..)` can be cancelled
204      * or the broadcast can become closed in the meantime).
205      * As broadcasts are obsolete, we keep this implementation
206      * as simple as possible, allowing non-linearizability
207      * in corner cases.
208      */
sendnull209     override suspend fun send(element: E) {
210         val subs = lock.withLock { // protected by lock
211             // Is this channel closed for send?
212             if (isClosedForSend) throw sendException
213             // Update the last sent element if this broadcast is conflated.
214             if (capacity == CONFLATED) lastConflatedElement = element
215             // Get a reference to the list of subscribers under the lock.
216             subscribers
217         }
218         // The lock has been released. Send the element to the
219         // subscribers one-by-one, and finish immediately
220         // when this broadcast discovered in the closed state.
221         // Note that this implementation is non-linearizable;
222         // see this method documentation for details.
223         subs.forEach {
224             // We use special function to send the element,
225             // which returns `true` on success and `false`
226             // if the subscriber is closed.
227             val success = it.sendBroadcast(element)
228             // The sending attempt has failed.
229             // Check whether the broadcast is closed.
230             if (!success && isClosedForSend) throw sendException
231         }
232     }
233 
<lambda>null234     override fun trySend(element: E): ChannelResult<Unit> = lock.withLock { // protected by lock
235         // Is this channel closed for send?
236         if (isClosedForSend) return super.trySend(element)
237         // Check whether the plain `send(..)` operation
238         // should suspend and fail in this case.
239         val shouldSuspend = subscribers.any { it.shouldSendSuspend() }
240         if (shouldSuspend) return ChannelResult.failure()
241         // Update the last sent element if this broadcast is conflated.
242         if (capacity == CONFLATED) lastConflatedElement = element
243         // Send the element to all subscribers.
244         // It is guaranteed that the attempt cannot fail,
245         // as both the broadcast closing and subscription
246         // cancellation are guarded by lock, which is held
247         // by the current operation.
248         subscribers.forEach { it.trySend(element) }
249         // Finish with success.
250         return ChannelResult.success(Unit)
251     }
252 
253     // ###########################################
254     // # The `select` Expression: onSend { ... } #
255     // ###########################################
256 
registerSelectForSendnull257     override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
258         // It is extremely complicated to support sending via `select` for broadcasts,
259         // as the operation should wait on multiple subscribers simultaneously.
260         // At the same time, broadcasts are obsolete, so we need a simple implementation
261         // that works somehow. Here is a tricky work-around. First, we launch a new
262         // coroutine that performs plain `send(..)` operation and tries to complete
263         // this `select` via `trySelect`, independently on whether it is in the
264         // registration or in the waiting phase. On success, the operation finishes.
265         // On failure, if another clause is already selected or the `select` operation
266         // has been cancelled, we observe non-linearizable behaviour, as this `onSend`
267         // clause is completed as well. However, we believe that such a non-linearizability
268         // is fine for obsolete API. The last case is when the `select` operation is still
269         // in the registration case, so this `onSend` clause should be re-registered.
270         // The idea is that we keep information that this `onSend` clause is already selected
271         // and finish immediately.
272         @Suppress("UNCHECKED_CAST")
273         element as E
274         // First, check whether this `onSend` clause is already
275         // selected, finishing immediately in this case.
276         lock.withLock {
277             val result = onSendInternalResult.remove(select)
278             if (result != null) { // already selected!
279                 // `result` is either `Unit` ot `CHANNEL_CLOSED`.
280                 select.selectInRegistrationPhase(result)
281                 return
282             }
283         }
284         // Start a new coroutine that performs plain `send(..)`
285         // and tries to select this `onSend` clause at the end.
286         CoroutineScope(select.context).launch(start = CoroutineStart.UNDISPATCHED) {
287             val success: Boolean = try {
288                 send(element)
289                 // The element has been successfully sent!
290                 true
291             } catch (t: Throwable) {
292                 // This broadcast must be closed. However, it is possible that
293                 // an unrelated exception, such as `OutOfMemoryError` has been thrown.
294                 // This implementation checks that the channel is actually closed,
295                 // re-throwing the caught exception otherwise.
296                 if (isClosedForSend && (t is ClosedSendChannelException || sendException === t)) false
297                 else throw t
298             }
299             // Mark this `onSend` clause as selected and
300             // try to complete the `select` operation.
301             lock.withLock {
302                 // Status of this `onSend` clause should not be presented yet.
303                 assert { onSendInternalResult[select] == null }
304                 // Success or fail? Put the corresponding result.
305                 onSendInternalResult[select] = if (success) Unit else CHANNEL_CLOSED
306                 // Try to select this `onSend` clause.
307                 select as SelectImplementation<*>
308                 val trySelectResult = select.trySelectDetailed(this@BroadcastChannelImpl,  Unit)
309                 if (trySelectResult !== TrySelectDetailedResult.REREGISTER) {
310                     // In case of re-registration (this `select` was still
311                     // in the registration phase), the algorithm will invoke
312                     // `registerSelectForSend`. As we stored an information that
313                     // this `onSend` clause is already selected (in `onSendInternalResult`),
314                     // the algorithm, will complete immediately. Otherwise, to avoid memory
315                     // leaks, we must remove this information from the hashmap.
316                     onSendInternalResult.remove(select)
317                 }
318             }
319 
320         }
321     }
322     private val onSendInternalResult = HashMap<SelectInstance<*>, Any?>() // select -> Unit or CHANNEL_CLOSED
323 
324     // ############################
325     // # Closing and Cancellation #
326     // ############################
327 
<lambda>null328     override fun close(cause: Throwable?): Boolean = lock.withLock { // protected by lock
329         // Close all subscriptions first.
330         subscribers.forEach { it.close(cause) }
331         // Remove all subscriptions that do not contain
332         // buffered elements or waiting send-s to avoid
333         // memory leaks. We must keep other subscriptions
334         // in case `broadcast.cancel(..)` is called.
335         subscribers = subscribers.filter { it.hasElements() }
336         // Delegate to the parent implementation.
337         super.close(cause)
338     }
339 
<lambda>null340     override fun cancelImpl(cause: Throwable?): Boolean = lock.withLock { // protected by lock
341         // Cancel all subscriptions. As part of cancellation procedure,
342         // subscriptions automatically remove themselves from this broadcast.
343         subscribers.forEach { it.cancelImpl(cause) }
344         // For the conflated implementation, clear the last sent element.
345         lastConflatedElement = NO_ELEMENT
346         // Finally, delegate to the parent implementation.
347         super.cancelImpl(cause)
348     }
349 
350     override val isClosedForSend: Boolean
351         // Protect by lock to synchronize with `close(..)` / `cancel(..)`.
<lambda>null352         get() = lock.withLock { super.isClosedForSend }
353 
354     // ##############################
355     // # Subscriber Implementations #
356     // ##############################
357 
358     private inner class SubscriberBuffered : BufferedChannel<E>(capacity = capacity) {
<lambda>null359         public override fun cancelImpl(cause: Throwable?): Boolean = lock.withLock {
360             // Remove this subscriber from the broadcast on cancellation.
361             removeSubscriber(this@SubscriberBuffered )
362             super.cancelImpl(cause)
363         }
364     }
365 
366     private inner class SubscriberConflated : ConflatedBufferedChannel<E>(capacity = 1, onBufferOverflow = DROP_OLDEST) {
cancelImplnull367         public override fun cancelImpl(cause: Throwable?): Boolean {
368             // Remove this subscriber from the broadcast on cancellation.
369             removeSubscriber(this@SubscriberConflated )
370             return super.cancelImpl(cause)
371         }
372     }
373 
374     // ########################################
375     // # ConflatedBroadcastChannel Operations #
376     // ########################################
377 
378     @Suppress("UNCHECKED_CAST")
<lambda>null379     val value: E get() = lock.withLock {
380         // Is this channel closed for sending?
381         if (isClosedForSend) {
382             throw closeCause ?: IllegalStateException("This broadcast channel is closed")
383         }
384         // Is there sent element?
385         if (lastConflatedElement === NO_ELEMENT) error("No value")
386         // Return the last sent element.
387         lastConflatedElement as E
388     }
389 
390     @Suppress("UNCHECKED_CAST")
<lambda>null391     val valueOrNull: E? get() = lock.withLock {
392         // Is this channel closed for sending?
393         if (isClosedForReceive) null
394         // Is there sent element?
395         else if (lastConflatedElement === NO_ELEMENT) null
396         // Return the last sent element.
397         else lastConflatedElement as E
398     }
399 
400     // #################
401     // # For Debugging #
402     // #################
403 
toStringnull404     override fun toString() =
405         (if (lastConflatedElement !== NO_ELEMENT) "CONFLATED_ELEMENT=$lastConflatedElement; " else "") +
406             "BROADCAST=<${super.toString()}>; " +
407             "SUBSCRIBERS=${subscribers.joinToString(separator = ";", prefix = "<", postfix = ">")}"
408 }
409 
410 private val NO_ELEMENT = Symbol("NO_ELEMENT")
411