1 package kotlinx.coroutines.channels
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.channels.BufferOverflow.*
5 import kotlinx.coroutines.channels.ChannelResult.Companion.closed
6 import kotlinx.coroutines.channels.ChannelResult.Companion.success
7 import kotlinx.coroutines.internal.*
8 import kotlinx.coroutines.internal.OnUndeliveredElement
9 import kotlinx.coroutines.selects.*
10 import kotlin.coroutines.*
11 
12 /**
13  * This is a special [BufferedChannel] extension that supports [DROP_OLDEST] and [DROP_LATEST]
14  * strategies for buffer overflowing. This implementation ensures that `send(e)` never suspends,
15  * either extracting the first element ([DROP_OLDEST]) or dropping the sending one ([DROP_LATEST])
16  * when the channel capacity exceeds.
17  */
18 internal open class ConflatedBufferedChannel<E>(
19     private val capacity: Int,
20     private val onBufferOverflow: BufferOverflow,
21     onUndeliveredElement: OnUndeliveredElement<E>? = null
22 ) : BufferedChannel<E>(capacity = capacity, onUndeliveredElement = onUndeliveredElement) {
23     init {
<lambda>null24         require(onBufferOverflow !== SUSPEND) {
25             "This implementation does not support suspension for senders, use ${BufferedChannel::class.simpleName} instead"
26         }
<lambda>null27         require(capacity >= 1) {
28             "Buffered channel capacity must be at least 1, but $capacity was specified"
29         }
30     }
31 
32     override val isConflatedDropOldest: Boolean
33         get() = onBufferOverflow == DROP_OLDEST
34 
sendnull35     override suspend fun send(element: E) {
36         // Should never suspend, implement via `trySend(..)`.
37         trySendImpl(element, isSendOp = true).onClosed { // fails only when this channel is closed.
38             onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
39                 it.addSuppressed(sendException)
40                 throw it
41             }
42             throw sendException
43         }
44     }
45 
sendBroadcastnull46     override suspend fun sendBroadcast(element: E): Boolean {
47         // Should never suspend, implement via `trySend(..)`.
48         trySendImpl(element, isSendOp = true) // fails only when this channel is closed.
49             .onSuccess { return true }
50         return false
51     }
52 
trySendnull53     override fun trySend(element: E): ChannelResult<Unit> = trySendImpl(element, isSendOp = false)
54 
55     private fun trySendImpl(element: E, isSendOp: Boolean) =
56         if (onBufferOverflow === DROP_LATEST) trySendDropLatest(element, isSendOp)
57         else trySendDropOldest(element)
58 
59     private fun trySendDropLatest(element: E, isSendOp: Boolean): ChannelResult<Unit> {
60         // Try to send the element without suspension.
61         val result = super.trySend(element)
62         // Complete on success or if this channel is closed.
63         if (result.isSuccess || result.isClosed) return result
64         // This channel is full. Drop the sending element.
65         // Call the `onUndeliveredElement` lambda ONLY for 'send()' invocations,
66         // for 'trySend()' it is responsibility of the caller
67         if (isSendOp) {
68             onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
69                 throw it
70             }
71         }
72         return success(Unit)
73     }
74 
75     @Suppress("UNCHECKED_CAST")
registerSelectForSendnull76     override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
77         // The plain `send(..)` operation never suspends. Thus, either this
78         // attempt to send the element succeeds or the channel is closed.
79         // In any case, complete this `select` in the registration phase.
80         trySend(element as E).let {
81             it.onSuccess {
82                 select.selectInRegistrationPhase(Unit)
83                 return
84             }.onClosed {
85                 select.selectInRegistrationPhase(CHANNEL_CLOSED)
86                 return
87             }
88         }
89         error("unreachable")
90     }
91 
shouldSendSuspendnull92     override fun shouldSendSuspend() = false // never suspends.
93 }
94