1 package kotlinx.coroutines.channels 2 3 import kotlinx.atomicfu.* 4 import kotlinx.coroutines.channels.BufferOverflow.* 5 import kotlinx.coroutines.channels.ChannelResult.Companion.closed 6 import kotlinx.coroutines.channels.ChannelResult.Companion.success 7 import kotlinx.coroutines.internal.* 8 import kotlinx.coroutines.internal.OnUndeliveredElement 9 import kotlinx.coroutines.selects.* 10 import kotlin.coroutines.* 11 12 /** 13 * This is a special [BufferedChannel] extension that supports [DROP_OLDEST] and [DROP_LATEST] 14 * strategies for buffer overflowing. This implementation ensures that `send(e)` never suspends, 15 * either extracting the first element ([DROP_OLDEST]) or dropping the sending one ([DROP_LATEST]) 16 * when the channel capacity exceeds. 17 */ 18 internal open class ConflatedBufferedChannel<E>( 19 private val capacity: Int, 20 private val onBufferOverflow: BufferOverflow, 21 onUndeliveredElement: OnUndeliveredElement<E>? = null 22 ) : BufferedChannel<E>(capacity = capacity, onUndeliveredElement = onUndeliveredElement) { 23 init { <lambda>null24 require(onBufferOverflow !== SUSPEND) { 25 "This implementation does not support suspension for senders, use ${BufferedChannel::class.simpleName} instead" 26 } <lambda>null27 require(capacity >= 1) { 28 "Buffered channel capacity must be at least 1, but $capacity was specified" 29 } 30 } 31 32 override val isConflatedDropOldest: Boolean 33 get() = onBufferOverflow == DROP_OLDEST 34 sendnull35 override suspend fun send(element: E) { 36 // Should never suspend, implement via `trySend(..)`. 37 trySendImpl(element, isSendOp = true).onClosed { // fails only when this channel is closed. 38 onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let { 39 it.addSuppressed(sendException) 40 throw it 41 } 42 throw sendException 43 } 44 } 45 sendBroadcastnull46 override suspend fun sendBroadcast(element: E): Boolean { 47 // Should never suspend, implement via `trySend(..)`. 48 trySendImpl(element, isSendOp = true) // fails only when this channel is closed. 49 .onSuccess { return true } 50 return false 51 } 52 trySendnull53 override fun trySend(element: E): ChannelResult<Unit> = trySendImpl(element, isSendOp = false) 54 55 private fun trySendImpl(element: E, isSendOp: Boolean) = 56 if (onBufferOverflow === DROP_LATEST) trySendDropLatest(element, isSendOp) 57 else trySendDropOldest(element) 58 59 private fun trySendDropLatest(element: E, isSendOp: Boolean): ChannelResult<Unit> { 60 // Try to send the element without suspension. 61 val result = super.trySend(element) 62 // Complete on success or if this channel is closed. 63 if (result.isSuccess || result.isClosed) return result 64 // This channel is full. Drop the sending element. 65 // Call the `onUndeliveredElement` lambda ONLY for 'send()' invocations, 66 // for 'trySend()' it is responsibility of the caller 67 if (isSendOp) { 68 onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let { 69 throw it 70 } 71 } 72 return success(Unit) 73 } 74 75 @Suppress("UNCHECKED_CAST") registerSelectForSendnull76 override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) { 77 // The plain `send(..)` operation never suspends. Thus, either this 78 // attempt to send the element succeeds or the channel is closed. 79 // In any case, complete this `select` in the registration phase. 80 trySend(element as E).let { 81 it.onSuccess { 82 select.selectInRegistrationPhase(Unit) 83 return 84 }.onClosed { 85 select.selectInRegistrationPhase(CHANNEL_CLOSED) 86 return 87 } 88 } 89 error("unreachable") 90 } 91 shouldSendSuspendnull92 override fun shouldSendSuspend() = false // never suspends. 93 } 94