1 @file:Suppress("FunctionName", "DEPRECATION") 2 3 package kotlinx.coroutines.channels 4 5 import kotlinx.coroutines.* 6 import kotlinx.coroutines.channels.BufferOverflow.* 7 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED 8 import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY 9 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED 10 import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED 11 import kotlinx.coroutines.internal.* 12 import kotlinx.coroutines.selects.* 13 import kotlin.native.concurrent.* 14 15 /** 16 * Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers 17 * that subscribe for the elements using [openSubscription] function and unsubscribe using [ReceiveChannel.cancel] 18 * function. 19 * 20 * See `BroadcastChannel()` factory function for the description of available 21 * broadcast channel implementations. 22 * 23 * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0** 24 * It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow]. 25 */ 26 @ObsoleteCoroutinesApi 27 @Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported") 28 public interface BroadcastChannel<E> : SendChannel<E> { 29 /** 30 * Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it. 31 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this 32 * broadcast channel. 33 */ openSubscriptionnull34 public fun openSubscription(): ReceiveChannel<E> 35 36 /** 37 * Cancels reception of remaining elements from this channel with an optional cause. 38 * This function closes the channel with 39 * the specified cause (unless it was already closed), removes all buffered sent elements from it, 40 * and [cancels][ReceiveChannel.cancel] all open subscriptions. 41 * A cause can be used to specify an error message or to provide other details on 42 * a cancellation reason for debugging purposes. 43 */ 44 public fun cancel(cause: CancellationException? = null) 45 46 /** 47 * @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel]. 48 */ 49 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only") 50 public fun cancel(cause: Throwable? = null): Boolean 51 } 52 53 /** 54 * Creates a broadcast channel with the specified buffer capacity. 55 * 56 * The resulting channel type depends on the specified [capacity] parameter: 57 * 58 * - when `capacity` positive, but less than [UNLIMITED] -- creates `ArrayBroadcastChannel` with a buffer of given capacity. 59 * **Note:** this channel looses all items that have been sent to it until the first subscriber appears; 60 * - when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends; 61 * - when `capacity` is [BUFFERED] -- creates `ArrayBroadcastChannel` with a default capacity. 62 * - otherwise -- throws [IllegalArgumentException]. 63 * 64 * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0** 65 * It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow] and [StateFlow][kotlinx.coroutines.flow.StateFlow]. 66 */ 67 @ObsoleteCoroutinesApi 68 @Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and StateFlow, and is no longer supported") 69 public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> = 70 when (capacity) { 71 0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel") 72 UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel") 73 CONFLATED -> ConflatedBroadcastChannel() 74 BUFFERED -> BroadcastChannelImpl(CHANNEL_DEFAULT_CAPACITY) 75 else -> BroadcastChannelImpl(capacity) 76 } 77 78 /** 79 * Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers. 80 * 81 * Back-to-send sent elements are _conflated_ -- only the most recently sent value is received, 82 * while previously sent elements **are lost**. 83 * Every subscriber immediately receives the most recently sent element. 84 * Sender to this broadcast channel never suspends and [trySend] always succeeds. 85 * 86 * A secondary constructor can be used to create an instance of this class that already holds a value. 87 * This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation. 88 * 89 * In this implementation, [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription 90 * takes linear time in the number of subscribers. 91 * 92 * **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0** 93 * It is replaced with [SharedFlow][kotlinx.coroutines.flow.StateFlow]. 94 */ 95 @ObsoleteCoroutinesApi 96 @Deprecated(level = DeprecationLevel.WARNING, message = "ConflatedBroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported") 97 public class ConflatedBroadcastChannel<E> private constructor( 98 private val broadcast: BroadcastChannelImpl<E> 99 ) : BroadcastChannel<E> by broadcast { 100 public constructor(): this(BroadcastChannelImpl<E>(capacity = CONFLATED)) 101 /** 102 * Creates an instance of this class that already holds a value. 103 * 104 * It is as a shortcut to creating an instance with a default constructor and 105 * immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`. 106 */ 107 public constructor(value: E) : this() { 108 trySend(value) 109 } 110 111 /** 112 * The most recently sent element to this channel. 113 * 114 * Access to this property throws [IllegalStateException] when this class is constructed without 115 * initial value and no value was sent yet or if it was [closed][close] without a cause. 116 * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_. 117 */ 118 public val value: E get() = broadcast.value 119 /** 120 * The most recently sent element to this channel or `null` when this class is constructed without 121 * initial value and no value was sent yet or if it was [closed][close]. 122 */ 123 public val valueOrNull: E? get() = broadcast.valueOrNull 124 } 125 126 /** 127 * A common implementation for both the broadcast channel with a buffer of fixed [capacity] 128 * and the conflated broadcast channel (see [ConflatedBroadcastChannel]). 129 * 130 * **Note**, that elements that are sent to this channel while there are no 131 * [openSubscription] subscribers are immediately lost. 132 * 133 * This channel is created by `BroadcastChannel(capacity)` factory function invocation. 134 */ 135 @Suppress("MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_DEPRECATION_WARNING", "MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_WHEN_NO_EXPLICIT_OVERRIDE_DEPRECATION_WARNING") // do not remove the MULTIPLE_DEFAULTS suppression: required in K2 136 internal class BroadcastChannelImpl<E>( 137 /** 138 * Buffer capacity; [Channel.CONFLATED] when this broadcast is conflated. 139 */ 140 val capacity: Int 141 ) : BufferedChannel<E>(capacity = Channel.RENDEZVOUS, onUndeliveredElement = null), BroadcastChannel<E> { 142 init { <lambda>null143 require(capacity >= 1 || capacity == CONFLATED) { 144 "BroadcastChannel capacity must be positive or Channel.CONFLATED, but $capacity was specified" 145 } 146 } 147 148 // This implementation uses coarse-grained synchronization, 149 // as, reputedly, it is the simplest synchronization scheme. 150 // All operations are protected by this lock. 151 private val lock = ReentrantLock() 152 // The list of subscribers; all accesses should be protected by lock. 153 // Each change must create a new list instance to avoid `ConcurrentModificationException`. 154 private var subscribers: List<BufferedChannel<E>> = emptyList() 155 // When this broadcast is conflated, this field stores the last sent element. 156 // If this channel is empty or not conflated, it stores a special `NO_ELEMENT` marker. 157 private var lastConflatedElement: Any? = NO_ELEMENT // NO_ELEMENT or E 158 159 // ########################### 160 // # Subscription Management # 161 // ########################### 162 <lambda>null163 override fun openSubscription(): ReceiveChannel<E> = lock.withLock { // protected by lock 164 // Is this broadcast conflated or buffered? 165 // Create the corresponding subscription channel. 166 val s = if (capacity == CONFLATED) SubscriberConflated() else SubscriberBuffered() 167 // If this broadcast is already closed or cancelled, 168 // and the last sent element is not available in case 169 // this broadcast is conflated, close the created 170 // subscriber immediately and return it. 171 if (isClosedForSend && lastConflatedElement === NO_ELEMENT) { 172 s.close(closeCause) 173 return s 174 } 175 // Is this broadcast conflated? If so, send 176 // the last sent element to the subscriber. 177 if (lastConflatedElement !== NO_ELEMENT) { 178 s.trySend(value) 179 } 180 // Add the subscriber to the list and return it. 181 subscribers += s 182 s 183 } 184 <lambda>null185 private fun removeSubscriber(s: ReceiveChannel<E>) = lock.withLock { // protected by lock 186 subscribers = subscribers.filter { it !== s } 187 } 188 189 // ############################# 190 // # The `send(..)` Operations # 191 // ############################# 192 193 /** 194 * Sends the specified element to all subscribers. 195 * 196 * **!!! THIS IMPLEMENTATION IS NOT LINEARIZABLE !!!** 197 * 198 * As the operation should send the element to multiple 199 * subscribers simultaneously, it is non-trivial to 200 * implement it in an atomic way. Specifically, this 201 * would require a special implementation that does 202 * not transfer the element until all parties are able 203 * to resume it (this `send(..)` can be cancelled 204 * or the broadcast can become closed in the meantime). 205 * As broadcasts are obsolete, we keep this implementation 206 * as simple as possible, allowing non-linearizability 207 * in corner cases. 208 */ sendnull209 override suspend fun send(element: E) { 210 val subs = lock.withLock { // protected by lock 211 // Is this channel closed for send? 212 if (isClosedForSend) throw sendException 213 // Update the last sent element if this broadcast is conflated. 214 if (capacity == CONFLATED) lastConflatedElement = element 215 // Get a reference to the list of subscribers under the lock. 216 subscribers 217 } 218 // The lock has been released. Send the element to the 219 // subscribers one-by-one, and finish immediately 220 // when this broadcast discovered in the closed state. 221 // Note that this implementation is non-linearizable; 222 // see this method documentation for details. 223 subs.forEach { 224 // We use special function to send the element, 225 // which returns `true` on success and `false` 226 // if the subscriber is closed. 227 val success = it.sendBroadcast(element) 228 // The sending attempt has failed. 229 // Check whether the broadcast is closed. 230 if (!success && isClosedForSend) throw sendException 231 } 232 } 233 <lambda>null234 override fun trySend(element: E): ChannelResult<Unit> = lock.withLock { // protected by lock 235 // Is this channel closed for send? 236 if (isClosedForSend) return super.trySend(element) 237 // Check whether the plain `send(..)` operation 238 // should suspend and fail in this case. 239 val shouldSuspend = subscribers.any { it.shouldSendSuspend() } 240 if (shouldSuspend) return ChannelResult.failure() 241 // Update the last sent element if this broadcast is conflated. 242 if (capacity == CONFLATED) lastConflatedElement = element 243 // Send the element to all subscribers. 244 // It is guaranteed that the attempt cannot fail, 245 // as both the broadcast closing and subscription 246 // cancellation are guarded by lock, which is held 247 // by the current operation. 248 subscribers.forEach { it.trySend(element) } 249 // Finish with success. 250 return ChannelResult.success(Unit) 251 } 252 253 // ########################################### 254 // # The `select` Expression: onSend { ... } # 255 // ########################################### 256 registerSelectForSendnull257 override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) { 258 // It is extremely complicated to support sending via `select` for broadcasts, 259 // as the operation should wait on multiple subscribers simultaneously. 260 // At the same time, broadcasts are obsolete, so we need a simple implementation 261 // that works somehow. Here is a tricky work-around. First, we launch a new 262 // coroutine that performs plain `send(..)` operation and tries to complete 263 // this `select` via `trySelect`, independently on whether it is in the 264 // registration or in the waiting phase. On success, the operation finishes. 265 // On failure, if another clause is already selected or the `select` operation 266 // has been cancelled, we observe non-linearizable behaviour, as this `onSend` 267 // clause is completed as well. However, we believe that such a non-linearizability 268 // is fine for obsolete API. The last case is when the `select` operation is still 269 // in the registration case, so this `onSend` clause should be re-registered. 270 // The idea is that we keep information that this `onSend` clause is already selected 271 // and finish immediately. 272 @Suppress("UNCHECKED_CAST") 273 element as E 274 // First, check whether this `onSend` clause is already 275 // selected, finishing immediately in this case. 276 lock.withLock { 277 val result = onSendInternalResult.remove(select) 278 if (result != null) { // already selected! 279 // `result` is either `Unit` ot `CHANNEL_CLOSED`. 280 select.selectInRegistrationPhase(result) 281 return 282 } 283 } 284 // Start a new coroutine that performs plain `send(..)` 285 // and tries to select this `onSend` clause at the end. 286 CoroutineScope(select.context).launch(start = CoroutineStart.UNDISPATCHED) { 287 val success: Boolean = try { 288 send(element) 289 // The element has been successfully sent! 290 true 291 } catch (t: Throwable) { 292 // This broadcast must be closed. However, it is possible that 293 // an unrelated exception, such as `OutOfMemoryError` has been thrown. 294 // This implementation checks that the channel is actually closed, 295 // re-throwing the caught exception otherwise. 296 if (isClosedForSend && (t is ClosedSendChannelException || sendException === t)) false 297 else throw t 298 } 299 // Mark this `onSend` clause as selected and 300 // try to complete the `select` operation. 301 lock.withLock { 302 // Status of this `onSend` clause should not be presented yet. 303 assert { onSendInternalResult[select] == null } 304 // Success or fail? Put the corresponding result. 305 onSendInternalResult[select] = if (success) Unit else CHANNEL_CLOSED 306 // Try to select this `onSend` clause. 307 select as SelectImplementation<*> 308 val trySelectResult = select.trySelectDetailed(this@BroadcastChannelImpl, Unit) 309 if (trySelectResult !== TrySelectDetailedResult.REREGISTER) { 310 // In case of re-registration (this `select` was still 311 // in the registration phase), the algorithm will invoke 312 // `registerSelectForSend`. As we stored an information that 313 // this `onSend` clause is already selected (in `onSendInternalResult`), 314 // the algorithm, will complete immediately. Otherwise, to avoid memory 315 // leaks, we must remove this information from the hashmap. 316 onSendInternalResult.remove(select) 317 } 318 } 319 320 } 321 } 322 private val onSendInternalResult = HashMap<SelectInstance<*>, Any?>() // select -> Unit or CHANNEL_CLOSED 323 324 // ############################ 325 // # Closing and Cancellation # 326 // ############################ 327 <lambda>null328 override fun close(cause: Throwable?): Boolean = lock.withLock { // protected by lock 329 // Close all subscriptions first. 330 subscribers.forEach { it.close(cause) } 331 // Remove all subscriptions that do not contain 332 // buffered elements or waiting send-s to avoid 333 // memory leaks. We must keep other subscriptions 334 // in case `broadcast.cancel(..)` is called. 335 subscribers = subscribers.filter { it.hasElements() } 336 // Delegate to the parent implementation. 337 super.close(cause) 338 } 339 <lambda>null340 override fun cancelImpl(cause: Throwable?): Boolean = lock.withLock { // protected by lock 341 // Cancel all subscriptions. As part of cancellation procedure, 342 // subscriptions automatically remove themselves from this broadcast. 343 subscribers.forEach { it.cancelImpl(cause) } 344 // For the conflated implementation, clear the last sent element. 345 lastConflatedElement = NO_ELEMENT 346 // Finally, delegate to the parent implementation. 347 super.cancelImpl(cause) 348 } 349 350 override val isClosedForSend: Boolean 351 // Protect by lock to synchronize with `close(..)` / `cancel(..)`. <lambda>null352 get() = lock.withLock { super.isClosedForSend } 353 354 // ############################## 355 // # Subscriber Implementations # 356 // ############################## 357 358 private inner class SubscriberBuffered : BufferedChannel<E>(capacity = capacity) { <lambda>null359 public override fun cancelImpl(cause: Throwable?): Boolean = lock.withLock { 360 // Remove this subscriber from the broadcast on cancellation. 361 removeSubscriber(this@SubscriberBuffered ) 362 super.cancelImpl(cause) 363 } 364 } 365 366 private inner class SubscriberConflated : ConflatedBufferedChannel<E>(capacity = 1, onBufferOverflow = DROP_OLDEST) { cancelImplnull367 public override fun cancelImpl(cause: Throwable?): Boolean { 368 // Remove this subscriber from the broadcast on cancellation. 369 removeSubscriber(this@SubscriberConflated ) 370 return super.cancelImpl(cause) 371 } 372 } 373 374 // ######################################## 375 // # ConflatedBroadcastChannel Operations # 376 // ######################################## 377 378 @Suppress("UNCHECKED_CAST") <lambda>null379 val value: E get() = lock.withLock { 380 // Is this channel closed for sending? 381 if (isClosedForSend) { 382 throw closeCause ?: IllegalStateException("This broadcast channel is closed") 383 } 384 // Is there sent element? 385 if (lastConflatedElement === NO_ELEMENT) error("No value") 386 // Return the last sent element. 387 lastConflatedElement as E 388 } 389 390 @Suppress("UNCHECKED_CAST") <lambda>null391 val valueOrNull: E? get() = lock.withLock { 392 // Is this channel closed for sending? 393 if (isClosedForReceive) null 394 // Is there sent element? 395 else if (lastConflatedElement === NO_ELEMENT) null 396 // Return the last sent element. 397 else lastConflatedElement as E 398 } 399 400 // ################# 401 // # For Debugging # 402 // ################# 403 toStringnull404 override fun toString() = 405 (if (lastConflatedElement !== NO_ELEMENT) "CONFLATED_ELEMENT=$lastConflatedElement; " else "") + 406 "BROADCAST=<${super.toString()}>; " + 407 "SUBSCRIBERS=${subscribers.joinToString(separator = ";", prefix = "<", postfix = ">")}" 408 } 409 410 private val NO_ELEMENT = Symbol("NO_ELEMENT") 411