<lambda>null1 @file:Suppress("PrivatePropertyName")
2 
3 package kotlinx.coroutines.channels
4 
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.channels.ChannelResult.Companion.closed
8 import kotlinx.coroutines.channels.ChannelResult.Companion.failure
9 import kotlinx.coroutines.channels.ChannelResult.Companion.success
10 import kotlinx.coroutines.flow.internal.*
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.selects.*
13 import kotlinx.coroutines.selects.TrySelectDetailedResult.*
14 import kotlin.contracts.*
15 import kotlin.coroutines.*
16 import kotlin.js.*
17 import kotlin.jvm.*
18 import kotlin.math.*
19 import kotlin.random.*
20 import kotlin.reflect.*
21 
22 /**
23  * The buffered channel implementation, which also serves as a rendezvous channel when the capacity is zero.
24  * The high-level structure bases on a conceptually infinite array for storing elements and waiting requests,
25  * separate counters of [send] and [receive] invocations that were ever performed, and an additional counter
26  * that indicates the end of the logical buffer by counting the number of array cells it ever contained.
27  * The key idea is that both [send] and [receive] start by incrementing their counters, assigning the array cell
28  * referenced by the counter. In case of rendezvous channels, the operation either suspends and stores its continuation
29  * in the cell or makes a rendezvous with the opposite request. Each cell can be processed by exactly one [send] and
30  * one [receive]. As for buffered channels, [send]-s can also add elements without suspension if the logical buffer
31  * contains the cell, while the [receive] operation updates the end of the buffer when its synchronization finishes.
32  *
33  * Please see the ["Fast and Scalable Channels in Kotlin Coroutines"](https://arxiv.org/abs/2211.04986)
34  * paper by Nikita Koval, Roman Elizarov, and Dan Alistarh for the detailed algorithm description.
35  */
36 internal open class BufferedChannel<E>(
37     /**
38      * Channel capacity; `Channel.RENDEZVOUS` for rendezvous channel
39      * and `Channel.UNLIMITED` for unlimited capacity.
40      */
41     private val capacity: Int,
42     @JvmField
43     internal val onUndeliveredElement: OnUndeliveredElement<E>? = null
44 ) : Channel<E> {
45     init {
46         require(capacity >= 0) { "Invalid channel capacity: $capacity, should be >=0" }
47         // This implementation has second `init`.
48     }
49 
50     // Maintenance note: use `Buffered1ChannelLincheckTest` to check hypotheses.
51 
52     /*
53       The counters indicate the total numbers of send, receive, and buffer expansion calls
54       ever performed. The counters are incremented in the beginning of the corresponding
55       operation; thus, acquiring a unique (for the operation type) cell to process.
56       The segments reference to the last working one for each operation type.
57 
58       Notably, the counter for send is combined with the channel closing status
59       for synchronization simplicity and performance reasons.
60 
61       The logical end of the buffer is initialized with the channel capacity.
62       If the channel is rendezvous or unlimited, the counter equals `BUFFER_END_RENDEZVOUS`
63       or `BUFFER_END_RENDEZVOUS`, respectively, and never updates. The `bufferEndSegment`
64       point to a special `NULL_SEGMENT` in this case.
65      */
66     private val sendersAndCloseStatus = atomic(0L)
67     private val receivers = atomic(0L)
68     private val bufferEnd = atomic(initialBufferEnd(capacity))
69 
70     internal val sendersCounter: Long get() = sendersAndCloseStatus.value.sendersCounter
71     internal val receiversCounter: Long get() = receivers.value
72     private val bufferEndCounter: Long get() = bufferEnd.value
73 
74     /*
75       Additionally to the counters above, we need an extra one that
76       tracks the number of cells processed by `expandBuffer()`.
77       When a receiver aborts, the corresponding cell might be
78       physically removed from the data structure to avoid memory
79       leaks, while it still can be unprocessed by `expandBuffer()`.
80       In this case, `expandBuffer()` cannot know whether the
81       removed cell contained sender or receiver and, therefore,
82       cannot proceed. To solve the race, we ensure that cells
83       correspond to cancelled receivers cannot be physically
84       removed until the cell is processed.
85       This additional counter enables the synchronization,
86      */
87     private val completedExpandBuffersAndPauseFlag = atomic(bufferEndCounter)
88 
89     private val isRendezvousOrUnlimited
90         get() = bufferEndCounter.let { it == BUFFER_END_RENDEZVOUS || it == BUFFER_END_UNLIMITED }
91 
92     private val sendSegment: AtomicRef<ChannelSegment<E>>
93     private val receiveSegment: AtomicRef<ChannelSegment<E>>
94     private val bufferEndSegment: AtomicRef<ChannelSegment<E>>
95 
96     init {
97         @Suppress("LeakingThis")
98         val firstSegment = ChannelSegment(id = 0, prev = null, channel = this, pointers = 3)
99         sendSegment = atomic(firstSegment)
100         receiveSegment = atomic(firstSegment)
101         // If this channel is rendezvous or has unlimited capacity, the algorithm never
102         // invokes the buffer expansion procedure, and the corresponding segment reference
103         // points to a special `NULL_SEGMENT` one and never updates.
104         @Suppress("UNCHECKED_CAST")
105         bufferEndSegment = atomic(if (isRendezvousOrUnlimited) (NULL_SEGMENT as ChannelSegment<E>) else firstSegment)
106     }
107 
108     // #########################
109     // ## The send operations ##
110     // #########################
111 
112     override suspend fun send(element: E): Unit =
113         sendImpl( // <-- this is an inline function
114             element = element,
115             // Do not create a continuation until it is required;
116             // it is created later via [onNoWaiterSuspend], if needed.
117             waiter = null,
118             // Finish immediately if a rendezvous happens
119             // or the element has been buffered.
120             onRendezvousOrBuffered = {},
121             // As no waiter is provided, suspension is impossible.
122             onSuspend = { _, _ -> assert { false } },
123             // According to the `send(e)` contract, we need to call
124             // `onUndeliveredElement(..)` handler and throw an exception
125             // if the channel is already closed.
126             onClosed = { onClosedSend(element) },
127             // When `send(e)` decides to suspend, the corresponding
128             // `onNoWaiterSuspend` function that creates a continuation
129             // is called. The tail-call optimization is applied here.
130             onNoWaiterSuspend = { segm, i, elem, s -> sendOnNoWaiterSuspend(segm, i, elem, s) }
131         )
132 
133     // NB: return type could've been Nothing, but it breaks TCO
134     private suspend fun onClosedSend(element: E): Unit = suspendCancellableCoroutine { continuation ->
135         onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
136             // If it crashes, add send exception as suppressed for better diagnostics
137             it.addSuppressed(sendException)
138             continuation.resumeWithStackTrace(it)
139             return@suspendCancellableCoroutine
140         }
141         continuation.resumeWithStackTrace(sendException)
142     }
143 
144     private suspend fun sendOnNoWaiterSuspend(
145         /* The working cell is specified by
146         the segment and the index in it. */
147         segment: ChannelSegment<E>,
148         index: Int,
149         /** The element to be inserted. */
150         element: E,
151         /** The global index of the cell. */
152         s: Long
153     ) = suspendCancellableCoroutineReusable sc@{ cont ->
154         sendImplOnNoWaiter( // <-- this is an inline function
155             segment = segment, index = index, element = element, s = s,
156             // Store the created continuation as a waiter.
157             waiter = cont,
158             // If a rendezvous happens or the element has been buffered,
159             // resume the continuation and finish. In case of prompt
160             // cancellation, it is guaranteed that the element
161             // has been already buffered or passed to receiver.
162             onRendezvousOrBuffered = { cont.resume(Unit) },
163             // If the channel is closed, call `onUndeliveredElement(..)` and complete the
164             // continuation with the corresponding exception.
165             onClosed = { onClosedSendOnNoWaiterSuspend(element, cont) },
166         )
167     }
168 
169     private fun Waiter.prepareSenderForSuspension(
170         /* The working cell is specified by
171         the segment and the index in it. */
172         segment: ChannelSegment<E>,
173         index: Int
174     ) {
175         // To distinguish cancelled senders and receivers,
176         // senders equip the index value with an additional marker,
177         // adding `SEGMENT_SIZE` to the value.
178         invokeOnCancellation(segment, index + SEGMENT_SIZE)
179     }
180 
181     private fun onClosedSendOnNoWaiterSuspend(element: E, cont: CancellableContinuation<Unit>) {
182         onUndeliveredElement?.callUndeliveredElement(element, cont.context)
183         cont.resumeWithException(recoverStackTrace(sendException, cont))
184     }
185 
186     override fun trySend(element: E): ChannelResult<Unit> {
187         // Do not try to send the element if the plain `send(e)` operation would suspend.
188         if (shouldSendSuspend(sendersAndCloseStatus.value)) return failure()
189         // This channel either has waiting receivers or is closed.
190         // Let's try to send the element!
191         // The logic is similar to the plain `send(e)` operation, with
192         // the only difference that we install `INTERRUPTED_SEND` in case
193         // the operation decides to suspend.
194         return sendImpl( // <-- this is an inline function
195             element = element,
196             // Store an already interrupted sender in case of suspension.
197             waiter = INTERRUPTED_SEND,
198             // Finish successfully when a rendezvous happens
199             // or the element has been buffered.
200             onRendezvousOrBuffered = { success(Unit) },
201             // On suspension, the `INTERRUPTED_SEND` token has been installed,
202             // and this `trySend(e)` must fail. According to the contract,
203             // we do not need to call the [onUndeliveredElement] handler.
204             onSuspend = { segm, _ ->
205                 segm.onSlotCleaned()
206                 failure()
207             },
208             // If the channel is closed, return the corresponding result.
209             onClosed = { closed(sendException) }
210         )
211     }
212 
213     /**
214      * This is a special `send(e)` implementation that returns `true` if the element
215      * has been successfully sent, and `false` if the channel is closed.
216      *
217      * In case of coroutine cancellation, the element may be undelivered --
218      * the [onUndeliveredElement] feature is unsupported in this implementation.
219      *
220      */
221     internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutine { cont ->
222         check(onUndeliveredElement == null) {
223             "the `onUndeliveredElement` feature is unsupported for `sendBroadcast(e)`"
224         }
225         sendImpl(
226             element = element,
227             waiter = SendBroadcast(cont),
228             onRendezvousOrBuffered = { cont.resume(true) },
229             onSuspend = { _, _ -> },
230             onClosed = { cont.resume(false) }
231         )
232     }
233 
234     /**
235      * Specifies waiting [sendBroadcast] operation.
236      */
237     private class SendBroadcast(
238         val cont: CancellableContinuation<Boolean>
239     ) : Waiter by cont as CancellableContinuationImpl<Boolean>
240 
241     /**
242      * Abstract send implementation.
243      */
244     private inline fun <R> sendImpl(
245         /* The element to be sent. */
246         element: E,
247         /* The waiter to be stored in case of suspension,
248         or `null` if the waiter is not created yet.
249         In the latter case, when the algorithm decides
250         to suspend, [onNoWaiterSuspend] is called. */
251         waiter: Any?,
252         /* This lambda is invoked when the element has been
253         buffered or a rendezvous with a receiver happens. */
254         onRendezvousOrBuffered: () -> R,
255         /* This lambda is called when the operation suspends in the
256         cell specified by the segment and the index in it. */
257         onSuspend: (segm: ChannelSegment<E>, i: Int) -> R,
258         /* This lambda is called when the channel
259         is observed in the closed state. */
260         onClosed: () -> R,
261         /* This lambda is called when the operation decides
262         to suspend, but the waiter is not provided (equals `null`).
263         It should create a waiter and delegate to `sendImplOnNoWaiter`. */
264         onNoWaiterSuspend: (
265             segm: ChannelSegment<E>,
266             i: Int,
267             element: E,
268             s: Long
269         ) -> R = { _, _, _, _ -> error("unexpected") }
270     ): R {
271         // Read the segment reference before the counter increment;
272         // it is crucial to be able to find the required segment later.
273         var segment = sendSegment.value
274         while (true) {
275             // Atomically increment the `senders` counter and obtain the
276             // value right before the increment along with the close status.
277             val sendersAndCloseStatusCur = sendersAndCloseStatus.getAndIncrement()
278             val s = sendersAndCloseStatusCur.sendersCounter
279             // Is this channel already closed? Keep the information.
280             val closed = sendersAndCloseStatusCur.isClosedForSend0
281             // Count the required segment id and the cell index in it.
282             val id = s / SEGMENT_SIZE
283             val i = (s % SEGMENT_SIZE).toInt()
284             // Try to find the required segment if the initially obtained
285             // one (in the beginning of this function) has lower id.
286             if (segment.id != id) {
287                 // Find the required segment.
288                 segment = findSegmentSend(id, segment) ?:
289                     // The required segment has not been found.
290                     // Finish immediately if this channel is closed,
291                     // restarting the operation otherwise.
292                     // In the latter case, the required segment was full
293                     // of interrupted waiters and, therefore, removed
294                     // physically to avoid memory leaks.
295                     if (closed) {
296                         return onClosed()
297                     } else {
298                         continue
299                     }
300             }
301             // Update the cell according to the algorithm. Importantly, when
302             // the channel is already closed, storing a waiter is illegal, so
303             // the algorithm stores the `INTERRUPTED_SEND` token in this case.
304             when (updateCellSend(segment, i, element, s, waiter, closed)) {
305                 RESULT_RENDEZVOUS -> {
306                     // A rendezvous with a receiver has happened.
307                     // The previous segments are no longer needed
308                     // for the upcoming requests, so the algorithm
309                     // resets the link to the previous segment.
310                     segment.cleanPrev()
311                     return onRendezvousOrBuffered()
312                 }
313                 RESULT_BUFFERED -> {
314                     // The element has been buffered.
315                     return onRendezvousOrBuffered()
316                 }
317                 RESULT_SUSPEND -> {
318                     // The operation has decided to suspend and installed the
319                     // specified waiter. If the channel was already closed,
320                     // and the `INTERRUPTED_SEND` token has been installed as a waiter,
321                     // this request finishes with the `onClosed()` action.
322                     if (closed) {
323                         segment.onSlotCleaned()
324                         return onClosed()
325                     }
326                     (waiter as? Waiter)?.prepareSenderForSuspension(segment, i)
327                     return onSuspend(segment, i)
328                 }
329                 RESULT_CLOSED -> {
330                     // This channel is closed.
331                     // In case this segment is already or going to be
332                     // processed by a receiver, ensure that all the
333                     // previous segments are unreachable.
334                     if (s < receiversCounter) segment.cleanPrev()
335                     return onClosed()
336                 }
337                 RESULT_FAILED -> {
338                     // Either the cell stores an interrupted receiver,
339                     // or it was poisoned by a concurrent receiver.
340                     // In both cases, all the previous segments are already processed,
341                     segment.cleanPrev()
342                     continue
343                 }
344                 RESULT_SUSPEND_NO_WAITER -> {
345                     // The operation has decided to suspend,
346                     // but no waiter has been provided.
347                     return onNoWaiterSuspend(segment, i, element, s)
348                 }
349             }
350         }
351     }
352 
353     // Note: this function is temporarily moved from ConflatedBufferedChannel to BufferedChannel class, because of this issue: KT-65554.
354     // For now, an inline function, which invokes atomic operations, may only be called within a parent class.
355     protected fun trySendDropOldest(element: E): ChannelResult<Unit> =
356         sendImpl( // <-- this is an inline function
357             element = element,
358             // Put the element into the logical buffer even
359             // if this channel is already full, the `onSuspend`
360             // callback below extract the first (oldest) element.
361             waiter = BUFFERED,
362             // Finish successfully when a rendezvous has happened
363             // or the element has been buffered.
364             onRendezvousOrBuffered = { return success(Unit) },
365             // In case the algorithm decided to suspend, the element
366             // was added to the buffer. However, as the buffer is now
367             // overflowed, the first (oldest) element has to be extracted.
368             onSuspend = { segm, i ->
369                 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i)
370                 return success(Unit)
371             },
372             // If the channel is closed, return the corresponding result.
373             onClosed = { return closed(sendException) }
374         )
375 
376     private inline fun sendImplOnNoWaiter(
377         /* The working cell is specified by
378         the segment and the index in it. */
379         segment: ChannelSegment<E>,
380         index: Int,
381         /* The element to be sent. */
382         element: E,
383         /* The global index of the cell. */
384         s: Long,
385         /* The waiter to be stored in case of suspension. */
386         waiter: Waiter,
387         /* This lambda is invoked when the element has been
388         buffered or a rendezvous with a receiver happens.*/
389         onRendezvousOrBuffered: () -> Unit,
390         /* This lambda is called when the channel
391         is observed in the closed state. */
392         onClosed: () -> Unit,
393     ) {
394         // Update the cell again, now with the non-null waiter,
395         // restarting the operation from the beginning on failure.
396         // Check the `sendImpl(..)` function for the comments.
397         when (updateCellSend(segment, index, element, s, waiter, false)) {
398             RESULT_RENDEZVOUS -> {
399                 segment.cleanPrev()
400                 onRendezvousOrBuffered()
401             }
402             RESULT_BUFFERED -> {
403                 onRendezvousOrBuffered()
404             }
405             RESULT_SUSPEND -> {
406                 waiter.prepareSenderForSuspension(segment, index)
407             }
408             RESULT_CLOSED -> {
409                 if (s < receiversCounter) segment.cleanPrev()
410                 onClosed()
411             }
412             RESULT_FAILED -> {
413                 segment.cleanPrev()
414                 sendImpl(
415                     element = element,
416                     waiter = waiter,
417                     onRendezvousOrBuffered = onRendezvousOrBuffered,
418                     onSuspend = { _, _ -> },
419                     onClosed = onClosed,
420                 )
421             }
422             else -> error("unexpected")
423         }
424     }
425 
426     private fun updateCellSend(
427         /* The working cell is specified by
428         the segment and the index in it. */
429         segment: ChannelSegment<E>,
430         index: Int,
431         /* The element to be sent. */
432         element: E,
433         /* The global index of the cell. */
434         s: Long,
435         /* The waiter to be stored in case of suspension. */
436         waiter: Any?,
437         closed: Boolean
438     ): Int {
439         // This is a fast-path of `updateCellSendSlow(..)`.
440         //
441         // First, the algorithm stores the element,
442         // performing the synchronization after that.
443         // This way, receivers safely retrieve the
444         // element, following the safe publication pattern.
445         segment.storeElement(index, element)
446         if (closed) return updateCellSendSlow(segment, index, element, s, waiter, closed)
447         // Read the current cell state.
448         val state = segment.getState(index)
449         when {
450             // The cell is empty.
451             state === null -> {
452                 // If the element should be buffered, or a rendezvous should happen
453                 // while the receiver is still coming, try to buffer the element.
454                 // Otherwise, try to store the specified waiter in the cell.
455                 if (bufferOrRendezvousSend(s)) {
456                     // Move the cell state to `BUFFERED`.
457                     if (segment.casState(index, null, BUFFERED)) {
458                         // The element has been successfully buffered, finish.
459                         return RESULT_BUFFERED
460                     }
461                 } else {
462                     // This `send(e)` operation should suspend.
463                     // However, in case the channel has already
464                     // been observed closed, `INTERRUPTED_SEND`
465                     // is installed instead.
466                     if (waiter == null) {
467                         // The waiter is not specified; return the corresponding result.
468                         return RESULT_SUSPEND_NO_WAITER
469                     } else {
470                         // Try to install the waiter.
471                         if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
472                     }
473                 }
474             }
475             // A waiting receiver is stored in the cell.
476             state is Waiter -> {
477                 // As the element will be passed directly to the waiter,
478                 // the algorithm cleans the element slot in the cell.
479                 segment.cleanElement(index)
480                 // Try to make a rendezvous with the suspended receiver.
481                 return if (state.tryResumeReceiver(element)) {
482                     // Rendezvous! Move the cell state to `DONE_RCV` and finish.
483                     segment.setState(index, DONE_RCV)
484                     onReceiveDequeued()
485                     RESULT_RENDEZVOUS
486                 } else {
487                     // The resumption has failed. Update the cell state correspondingly
488                     // and clean the element field. It is also possible for a concurrent
489                     // cancellation handler to update the cell state; we can safely
490                     // ignore these updates.
491                     if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
492                         segment.onCancelledRequest(index, true)
493                     }
494                     RESULT_FAILED
495                 }
496             }
497         }
498         return updateCellSendSlow(segment, index, element, s, waiter, closed)
499     }
500 
501     /**
502      * Updates the working cell of an abstract send operation.
503      */
504     private fun updateCellSendSlow(
505         /* The working cell is specified by
506         the segment and the index in it. */
507         segment: ChannelSegment<E>,
508         index: Int,
509         /* The element to be sent. */
510         element: E,
511         /* The global index of the cell. */
512         s: Long,
513         /* The waiter to be stored in case of suspension. */
514         waiter: Any?,
515         closed: Boolean
516     ): Int {
517         // Then, the cell state should be updated according to
518         // its state machine; see the paper mentioned in the very
519         // beginning for the cell life-cycle and the algorithm details.
520         while (true) {
521             // Read the current cell state.
522             val state = segment.getState(index)
523             when {
524                 // The cell is empty.
525                 state === null -> {
526                     // If the element should be buffered, or a rendezvous should happen
527                     // while the receiver is still coming, try to buffer the element.
528                     // Otherwise, try to store the specified waiter in the cell.
529                     if (bufferOrRendezvousSend(s) && !closed) {
530                         // Move the cell state to `BUFFERED`.
531                         if (segment.casState(index, null, BUFFERED)) {
532                             // The element has been successfully buffered, finish.
533                             return RESULT_BUFFERED
534                         }
535                     } else {
536                         // This `send(e)` operation should suspend.
537                         // However, in case the channel has already
538                         // been observed closed, `INTERRUPTED_SEND`
539                         // is installed instead.
540                         when {
541                             // The channel is closed
542                             closed -> if (segment.casState(index, null, INTERRUPTED_SEND)) {
543                                 segment.onCancelledRequest(index, false)
544                                 return RESULT_CLOSED
545                             }
546                             // The waiter is not specified; return the corresponding result.
547                             waiter == null -> return RESULT_SUSPEND_NO_WAITER
548                             // Try to install the waiter.
549                             else -> if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
550                         }
551                     }
552                 }
553                 // This cell is in the logical buffer.
554                 state === IN_BUFFER -> {
555                     // Try to buffer the element.
556                     if (segment.casState(index, state, BUFFERED)) {
557                         // The element has been successfully buffered, finish.
558                         return RESULT_BUFFERED
559                     }
560                 }
561                 // The cell stores a cancelled receiver.
562                 state === INTERRUPTED_RCV -> {
563                     // Clean the element slot to avoid memory leaks and finish.
564                     segment.cleanElement(index)
565                     return RESULT_FAILED
566                 }
567                 // The cell is poisoned by a concurrent receive.
568                 state === POISONED -> {
569                     // Clean the element slot to avoid memory leaks and finish.
570                     segment.cleanElement(index)
571                     return RESULT_FAILED
572                 }
573                 // The channel is already closed.
574                 state === CHANNEL_CLOSED -> {
575                     // Clean the element slot to avoid memory leaks,
576                     // ensure that the closing/cancellation procedure
577                     // has been completed, and finish.
578                     segment.cleanElement(index)
579                     completeCloseOrCancel()
580                     return RESULT_CLOSED
581                 }
582                 // A waiting receiver is stored in the cell.
583                 else -> {
584                     assert { state is Waiter || state is WaiterEB }
585                     // As the element will be passed directly to the waiter,
586                     // the algorithm cleans the element slot in the cell.
587                     segment.cleanElement(index)
588                     // Unwrap the waiting receiver from `WaiterEB` if needed.
589                     // As a receiver is stored in the cell, the buffer expansion
590                     // procedure would finish, so senders simply ignore the "EB" marker.
591                     val receiver = if (state is WaiterEB) state.waiter else state
592                     // Try to make a rendezvous with the suspended receiver.
593                     return if (receiver.tryResumeReceiver(element)) {
594                         // Rendezvous! Move the cell state to `DONE_RCV` and finish.
595                         segment.setState(index, DONE_RCV)
596                         onReceiveDequeued()
597                         RESULT_RENDEZVOUS
598                     } else {
599                         // The resumption has failed. Update the cell state correspondingly
600                         // and clean the element field. It is also possible for a concurrent
601                         // `expandBuffer()` or the cancellation handler to update the cell state;
602                         // we can safely ignore these updates as senders do not help `expandBuffer()`.
603                         if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
604                             segment.onCancelledRequest(index, true)
605                         }
606                         RESULT_FAILED
607                     }
608                 }
609             }
610         }
611     }
612 
613     /**
614      * Checks whether a [send] invocation is bound to suspend if it is called
615      * with the specified [sendersAndCloseStatus], [receivers], and [bufferEnd]
616      * values. When this channel is already closed, the function returns `false`.
617      *
618      * Specifically, [send] suspends if the channel is not unlimited,
619      * the number of receivers is greater than then index of the working cell of the
620      * potential [send] invocation, and the buffer does not cover this cell
621      * in case of buffered channel.
622      * When the channel is already closed, [send] does not suspend.
623      */
624     @JsName("shouldSendSuspend0")
625     private fun shouldSendSuspend(curSendersAndCloseStatus: Long): Boolean {
626         // Does not suspend if the channel is already closed.
627         if (curSendersAndCloseStatus.isClosedForSend0) return false
628         // Does not suspend if a rendezvous may happen or the buffer is not full.
629         return !bufferOrRendezvousSend(curSendersAndCloseStatus.sendersCounter)
630     }
631 
632     /**
633      * Returns `true` when the specified [send] should place
634      * its element to the working cell without suspension.
635      */
636     private fun bufferOrRendezvousSend(curSenders: Long): Boolean =
637         curSenders < bufferEndCounter || curSenders < receiversCounter + capacity
638 
639     /**
640      * Checks whether a [send] invocation is bound to suspend if it is called
641      * with the current counter and close status values. See [shouldSendSuspend] for details.
642      *
643      * Note that this implementation is _false positive_ in case of rendezvous channels,
644      * so it can return `false` when a [send] invocation is bound to suspend. Specifically,
645      * the counter of `receive()` operations may indicate that there is a waiting receiver,
646      * while it has already been cancelled, so the potential rendezvous is bound to fail.
647      */
648     internal open fun shouldSendSuspend(): Boolean = shouldSendSuspend(sendersAndCloseStatus.value)
649 
650     /**
651      * Tries to resume this receiver with the specified [element] as a result.
652      * Returns `true` on success and `false` otherwise.
653      */
654     @Suppress("UNCHECKED_CAST")
655     private fun Any.tryResumeReceiver(element: E): Boolean = when(this) {
656         is SelectInstance<*> -> { // `onReceiveXXX` select clause
657             trySelect(this@BufferedChannel, element)
658         }
659         is ReceiveCatching<*> -> {
660             this as ReceiveCatching<E>
661             cont.tryResume0(success(element), onUndeliveredElement?.bindCancellationFun(element, cont.context))
662         }
663         is BufferedChannel<*>.BufferedChannelIterator -> {
664             this as BufferedChannel<E>.BufferedChannelIterator
665             tryResumeHasNext(element)
666         }
667         is CancellableContinuation<*> -> { // `receive()`
668             this as CancellableContinuation<E>
669             tryResume0(element, onUndeliveredElement?.bindCancellationFun(element, context))
670         }
671         else -> error("Unexpected receiver type: $this")
672     }
673 
674     // ##########################
675     // # The receive operations #
676     // ##########################
677 
678     /**
679      * This function is invoked when a receiver is added as a waiter in this channel.
680      */
681     protected open fun onReceiveEnqueued() {}
682 
683     /**
684      * This function is invoked when a waiting receiver is no longer stored in this channel;
685      * independently on whether it is caused by rendezvous, cancellation, or channel closing.
686      */
687     protected open fun onReceiveDequeued() {}
688 
689     override suspend fun receive(): E =
690         receiveImpl( // <-- this is an inline function
691             // Do not create a continuation until it is required;
692             // it is created later via [onNoWaiterSuspend], if needed.
693             waiter = null,
694             // Return the received element on successful retrieval from
695             // the buffer or rendezvous with a suspended sender.
696             // Also, inform `BufferedChannel` extensions that
697             // synchronization of this receive operation is completed.
698             onElementRetrieved = { element ->
699                 return element
700             },
701             // As no waiter is provided, suspension is impossible.
702             onSuspend = { _, _, _ -> error("unexpected") },
703             // Throw an exception if the channel is already closed.
704             onClosed = { throw recoverStackTrace(receiveException) },
705             // If `receive()` decides to suspend, the corresponding
706             // `suspend` function that creates a continuation is called.
707             // The tail-call optimization is applied here.
708             onNoWaiterSuspend = { segm, i, r -> receiveOnNoWaiterSuspend(segm, i, r) }
709         )
710 
711     private suspend fun receiveOnNoWaiterSuspend(
712         /* The working cell is specified by
713         the segment and the index in it. */
714         segment: ChannelSegment<E>,
715         index: Int,
716         /* The global index of the cell. */
717         r: Long
718     ) = suspendCancellableCoroutineReusable { cont ->
719         receiveImplOnNoWaiter( // <-- this is an inline function
720             segment = segment, index = index, r = r,
721             // Store the created continuation as a waiter.
722             waiter = cont,
723             // In case of successful element retrieval, resume
724             // the continuation with the element and inform the
725             // `BufferedChannel` extensions that the synchronization
726             // is completed. Importantly, the receiver coroutine
727             // may be cancelled after it is successfully resumed but
728             // not dispatched yet. In case `onUndeliveredElement` is
729             // specified, we need to invoke it in the latter case.
730             onElementRetrieved = { element ->
731                 val onCancellation = onUndeliveredElement?.bindCancellationFun(element, cont.context)
732                 cont.resume(element, onCancellation)
733             },
734             onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
735         )
736     }
737 
738     private fun Waiter.prepareReceiverForSuspension(segment: ChannelSegment<E>, index: Int) {
739         onReceiveEnqueued()
740         invokeOnCancellation(segment, index)
741     }
742 
743     private fun onClosedReceiveOnNoWaiterSuspend(cont: CancellableContinuation<E>) {
744         cont.resumeWithException(receiveException)
745     }
746 
747     /*
748     The implementation is exactly the same as of `receive()`,
749     with the only difference that this function returns a `ChannelResult`
750     instance and does not throw exception explicitly in case the channel
751     is already closed for receiving. Please refer the plain `receive()`
752     implementation for the comments.
753     */
754     override suspend fun receiveCatching(): ChannelResult<E> =
755         receiveImpl( // <-- this is an inline function
756             waiter = null,
757             onElementRetrieved = { element ->
758                 success(element)
759             },
760             onSuspend = { _, _, _ -> error("unexpected") },
761             onClosed = { closed(closeCause) },
762             onNoWaiterSuspend = { segm, i, r -> receiveCatchingOnNoWaiterSuspend(segm, i, r) }
763         )
764 
765     private suspend fun receiveCatchingOnNoWaiterSuspend(
766         segment: ChannelSegment<E>,
767         index: Int,
768         r: Long
769     ) = suspendCancellableCoroutineReusable { cont ->
770         val waiter = ReceiveCatching(cont as CancellableContinuationImpl<ChannelResult<E>>)
771         receiveImplOnNoWaiter(
772             segment, index, r,
773             waiter = waiter,
774             onElementRetrieved = { element ->
775                 cont.resume(success(element), onUndeliveredElement?.bindCancellationFun(element, cont.context))
776             },
777             onClosed = { onClosedReceiveCatchingOnNoWaiterSuspend(cont) }
778         )
779     }
780 
781     private fun onClosedReceiveCatchingOnNoWaiterSuspend(cont: CancellableContinuation<ChannelResult<E>>) {
782         cont.resume(closed(closeCause))
783     }
784 
785     override fun tryReceive(): ChannelResult<E> {
786         // Read the `receivers` counter first.
787         val r = receivers.value
788         val sendersAndCloseStatusCur = sendersAndCloseStatus.value
789         // Is this channel closed for receive?
790         if (sendersAndCloseStatusCur.isClosedForReceive0) {
791             return closed(closeCause)
792         }
793         // Do not try to receive an element if the plain `receive()` operation would suspend.
794         val s = sendersAndCloseStatusCur.sendersCounter
795         if (r >= s) return failure()
796         // Let's try to retrieve an element!
797         // The logic is similar to the plain `receive()` operation, with
798         // the only difference that we store `INTERRUPTED_RCV` in case
799         // the operation decides to suspend. This way, we can leverage
800         // the unconditional `Fetch-and-Add` instruction.
801         // One may consider storing `INTERRUPTED_RCV` instead of an actual waiter
802         // on suspension (a.k.a. "no elements to retrieve") as a short-cut of
803         // "suspending and cancelling immediately".
804         return receiveImpl( // <-- this is an inline function
805             // Store an already interrupted receiver in case of suspension.
806             waiter = INTERRUPTED_RCV,
807             // Finish when an element is successfully retrieved.
808             onElementRetrieved = { element -> success(element) },
809             // On suspension, the `INTERRUPTED_RCV` token has been
810             // installed, and this `tryReceive()` must fail.
811             onSuspend = { segm, _, globalIndex ->
812                 // Emulate "cancelled" receive, thus invoking 'waitExpandBufferCompletion' manually,
813                 // because effectively there were no cancellation
814                 waitExpandBufferCompletion(globalIndex)
815                 segm.onSlotCleaned()
816                 failure()
817             },
818             // If the channel is closed, return the corresponding result.
819             onClosed = { closed(closeCause) }
820         )
821     }
822 
823     /**
824      * Extracts the first element from this channel until the cell with the specified
825      * index is moved to the logical buffer. This is a key procedure for the _conflated_
826      * channel implementation, see [ConflatedBufferedChannel] with the [BufferOverflow.DROP_OLDEST]
827      * strategy on buffer overflowing.
828      */
829     protected fun dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(globalCellIndex: Long) {
830         assert { isConflatedDropOldest }
831         // Read the segment reference before the counter increment;
832         // it is crucial to be able to find the required segment later.
833         var segment = receiveSegment.value
834         while (true) {
835             // Read the receivers counter to check whether the specified cell is already in the buffer
836             // or should be moved to the buffer in a short time, due to the already started `receive()`.
837             val r = this.receivers.value
838             if (globalCellIndex < max(r + capacity, bufferEndCounter)) return
839             // The cell is outside the buffer. Try to extract the first element
840             // if the `receivers` counter has not been changed.
841             if (!this.receivers.compareAndSet(r, r + 1)) continue
842             // Count the required segment id and the cell index in it.
843             val id = r / SEGMENT_SIZE
844             val i = (r % SEGMENT_SIZE).toInt()
845             // Try to find the required segment if the initially obtained
846             // segment (in the beginning of this function) has lower id.
847             if (segment.id != id) {
848                 // Find the required segment, restarting the operation if it has not been found.
849                 segment = findSegmentReceive(id, segment) ?:
850                     // The required segment has not been found. It is possible that the channel is already
851                     // closed for receiving, so the linked list of segments is closed as well.
852                     // In the latter case, the operation will finish eventually after incrementing
853                     // the `receivers` counter sufficient times. Note that it is impossible to check
854                     // whether this channel is closed for receiving (we do this in `receive`),
855                     // as it may call this function when helping to complete closing the channel.
856                     continue
857             }
858             // Update the cell according to the cell life-cycle.
859             val updCellResult = updateCellReceive(segment, i, r, null)
860             when {
861                 updCellResult === FAILED -> {
862                     // The cell is poisoned; restart from the beginning.
863                     // To avoid memory leaks, we also need to reset
864                     // the `prev` pointer of the working segment.
865                     if (r < sendersCounter) segment.cleanPrev()
866                 }
867                 else -> { // element
868                     // A buffered element was retrieved from the cell.
869                     // Clean the reference to the previous segment.
870                     segment.cleanPrev()
871                     @Suppress("UNCHECKED_CAST")
872                     onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it }
873                 }
874             }
875         }
876     }
877 
878     /**
879      * Abstract receive implementation.
880      */
881     private inline fun <R> receiveImpl(
882         /* The waiter to be stored in case of suspension,
883         or `null` if the waiter is not created yet.
884         In the latter case, if the algorithm decides
885         to suspend, [onNoWaiterSuspend] is called. */
886         waiter: Any?,
887         /* This lambda is invoked when an element has been
888         successfully retrieved, either from the buffer or
889         by making a rendezvous with a suspended sender. */
890         onElementRetrieved: (element: E) -> R,
891         /* This lambda is called when the operation suspends in the cell
892         specified by the segment and its global and in-segment indices. */
893         onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
894         /* This lambda is called when the channel is observed
895         in the closed state and no waiting sender is found,
896         which means that it is closed for receiving. */
897         onClosed: () -> R,
898         /* This lambda is called when the operation decides
899         to suspend, but the waiter is not provided (equals `null`).
900         It should create a waiter and delegate to `sendImplOnNoWaiter`. */
901         onNoWaiterSuspend: (
902             segm: ChannelSegment<E>,
903             i: Int,
904             r: Long
905         ) -> R = { _, _, _ -> error("unexpected") }
906     ): R {
907         // Read the segment reference before the counter increment;
908         // it is crucial to be able to find the required segment later.
909         var segment = receiveSegment.value
910         while (true) {
911             // Similar to the `send(e)` operation, `receive()` first checks
912             // whether the channel is already closed for receiving.
913             if (isClosedForReceive) return onClosed()
914             // Atomically increments the `receivers` counter
915             // and obtain the value right before the increment.
916             val r = this.receivers.getAndIncrement()
917             // Count the required segment id and the cell index in it.
918             val id = r / SEGMENT_SIZE
919             val i = (r % SEGMENT_SIZE).toInt()
920             // Try to find the required segment if the initially obtained
921             // segment (in the beginning of this function) has lower id.
922             if (segment.id != id) {
923                 // Find the required segment, restarting the operation if it has not been found.
924                 segment = findSegmentReceive(id, segment) ?:
925                     // The required segment is not found. It is possible that the channel is already
926                     // closed for receiving, so the linked list of segments is closed as well.
927                     // In the latter case, the operation fails with the corresponding check at the beginning.
928                     continue
929             }
930             // Update the cell according to the cell life-cycle.
931             val updCellResult = updateCellReceive(segment, i, r, waiter)
932             return when {
933                 updCellResult === SUSPEND -> {
934                     // The operation has decided to suspend and
935                     // stored the specified waiter in the cell.
936                     (waiter as? Waiter)?.prepareReceiverForSuspension(segment, i)
937                     onSuspend(segment, i, r)
938                 }
939                 updCellResult === FAILED -> {
940                     // The operation has tried to make a rendezvous
941                     // but failed: either the opposite request has
942                     // already been cancelled or the cell is poisoned.
943                     // Restart from the beginning in this case.
944                     // To avoid memory leaks, we also need to reset
945                     // the `prev` pointer of the working segment.
946                     if (r < sendersCounter) segment.cleanPrev()
947                     continue
948                 }
949                 updCellResult === SUSPEND_NO_WAITER -> {
950                     // The operation has decided to suspend,
951                     // but no waiter has been provided.
952                     onNoWaiterSuspend(segment, i, r)
953                 }
954                 else -> { // element
955                     // Either a buffered element was retrieved from the cell
956                     // or a rendezvous with a waiting sender has happened.
957                     // Clean the reference to the previous segment before finishing.
958                     segment.cleanPrev()
959                     @Suppress("UNCHECKED_CAST")
960                     onElementRetrieved(updCellResult as E)
961                 }
962             }
963         }
964     }
965 
966     private inline fun receiveImplOnNoWaiter(
967         /* The working cell is specified by
968         the segment and the index in it. */
969         segment: ChannelSegment<E>,
970         index: Int,
971         /* The global index of the cell. */
972         r: Long,
973         /* The waiter to be stored in case of suspension. */
974         waiter: Waiter,
975         /* This lambda is invoked when an element has been
976         successfully retrieved, either from the buffer or
977         by making a rendezvous with a suspended sender. */
978         onElementRetrieved: (element: E) -> Unit,
979         /* This lambda is called when the channel is observed
980         in the closed state and no waiting senders is found,
981         which means that it is closed for receiving. */
982         onClosed: () -> Unit
983     ) {
984         // Update the cell with the non-null waiter,
985         // restarting from the beginning on failure.
986         // Check the `receiveImpl(..)` function for the comments.
987         val updCellResult = updateCellReceive(segment, index, r, waiter)
988         when {
989             updCellResult === SUSPEND -> {
990                 waiter.prepareReceiverForSuspension(segment, index)
991             }
992             updCellResult === FAILED -> {
993                 if (r < sendersCounter) segment.cleanPrev()
994                 receiveImpl(
995                     waiter = waiter,
996                     onElementRetrieved = onElementRetrieved,
997                     onSuspend = { _, _, _ -> },
998                     onClosed = onClosed
999                 )
1000             }
1001             else -> {
1002                 segment.cleanPrev()
1003                 @Suppress("UNCHECKED_CAST")
1004                 onElementRetrieved(updCellResult as E)
1005             }
1006         }
1007     }
1008 
1009     private fun updateCellReceive(
1010         /* The working cell is specified by
1011         the segment and the index in it. */
1012         segment: ChannelSegment<E>,
1013         index: Int,
1014         /* The global index of the cell. */
1015         r: Long,
1016         /* The waiter to be stored in case of suspension. */
1017         waiter: Any?,
1018     ): Any? {
1019         // This is a fast-path of `updateCellReceiveSlow(..)`.
1020         //
1021         // Read the current cell state.
1022         val state = segment.getState(index)
1023         when {
1024             // The cell is empty.
1025             state === null -> {
1026                 // If a rendezvous must happen, the operation does not wait
1027                 // until the cell stores a buffered element or a suspended
1028                 // sender, poisoning the cell and restarting instead.
1029                 // Otherwise, try to store the specified waiter in the cell.
1030                 val senders = sendersAndCloseStatus.value.sendersCounter
1031                 if (r >= senders) {
1032                     // This `receive()` operation should suspend.
1033                     if (waiter === null) {
1034                         // The waiter is not specified;
1035                         // return the corresponding result.
1036                         return SUSPEND_NO_WAITER
1037                     }
1038                     // Try to install the waiter.
1039                     if (segment.casState(index, state, waiter)) {
1040                         // The waiter has been successfully installed.
1041                         // Invoke the `expandBuffer()` procedure and finish.
1042                         expandBuffer()
1043                         return SUSPEND
1044                     }
1045                 }
1046             }
1047             // The cell stores a buffered element.
1048             state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
1049                 // Retrieve the element and expand the buffer.
1050                 expandBuffer()
1051                 return segment.retrieveElement(index)
1052             }
1053         }
1054         return updateCellReceiveSlow(segment, index, r, waiter)
1055     }
1056 
1057     private fun updateCellReceiveSlow(
1058         /* The working cell is specified by
1059         the segment and the index in it. */
1060         segment: ChannelSegment<E>,
1061         index: Int,
1062         /* The global index of the cell. */
1063         r: Long,
1064         /* The waiter to be stored in case of suspension. */
1065         waiter: Any?,
1066     ): Any? {
1067         // The cell state should be updated according to  its state machine;
1068         // see the paper mentioned in the very beginning for the algorithm details.
1069         while (true) {
1070             // Read the current cell state.
1071             val state = segment.getState(index)
1072             when {
1073                 // The cell is empty.
1074                 state === null || state === IN_BUFFER -> {
1075                     // If a rendezvous must happen, the operation does not wait
1076                     // until the cell stores a buffered element or a suspended
1077                     // sender, poisoning the cell and restarting instead.
1078                     // Otherwise, try to store the specified waiter in the cell.
1079                     val senders = sendersAndCloseStatus.value.sendersCounter
1080                     if (r < senders) {
1081                         // The cell is already covered by sender,
1082                         // so a rendezvous must happen. Unfortunately,
1083                         // the cell is empty, so the operation poisons it.
1084                         if (segment.casState(index, state, POISONED)) {
1085                             // When the cell becomes poisoned, it is essentially
1086                             // the same as storing an already cancelled receiver.
1087                             // Thus, the `expandBuffer()` procedure should be invoked.
1088                             expandBuffer()
1089                             return FAILED
1090                         }
1091                     } else {
1092                         // This `receive()` operation should suspend.
1093                         if (waiter === null) {
1094                             // The waiter is not specified;
1095                             // return the corresponding result.
1096                             return SUSPEND_NO_WAITER
1097                         }
1098                         // Try to install the waiter.
1099                         if (segment.casState(index, state, waiter)) {
1100                             // The waiter has been successfully installed.
1101                             // Invoke the `expandBuffer()` procedure and finish.
1102                             expandBuffer()
1103                             return SUSPEND
1104                         }
1105                     }
1106                 }
1107                 // The cell stores a buffered element.
1108                 state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
1109                     // Retrieve the element and expand the buffer.
1110                     expandBuffer()
1111                     return segment.retrieveElement(index)
1112                 }
1113                 // The cell stores an interrupted sender.
1114                 state === INTERRUPTED_SEND -> return FAILED
1115                 // The cell is already poisoned by a concurrent
1116                 // `hasElements` call. Restart in this case.
1117                 state === POISONED -> return FAILED
1118                 // This channel is already closed.
1119                 state === CHANNEL_CLOSED -> {
1120                     // Although the channel is closed, it is still required
1121                     // to call the `expandBuffer()` procedure to keep
1122                     // `waitForExpandBufferCompletion()` correct.
1123                     expandBuffer()
1124                     return FAILED
1125                 }
1126                 // A concurrent `expandBuffer()` is resuming a
1127                 // suspended sender. Wait in a spin-loop until
1128                 // the resumption attempt completes: the cell
1129                 // state must change to either `BUFFERED` or
1130                 // `INTERRUPTED_SEND`.
1131                 state === RESUMING_BY_EB -> continue
1132                 // The cell stores a suspended sender; try to resume it.
1133                 else -> {
1134                     // To synchronize with expandBuffer(), the algorithm
1135                     // first moves the cell to an intermediate `S_RESUMING_BY_RCV`
1136                     // state, updating it to either `BUFFERED` (on success) or
1137                     // `INTERRUPTED_SEND` (on failure).
1138                     if (segment.casState(index, state, RESUMING_BY_RCV)) {
1139                         // Has a concurrent `expandBuffer()` delegated its completion?
1140                         val helpExpandBuffer = state is WaiterEB
1141                         // Extract the sender if needed and try to resume it.
1142                         val sender = if (state is WaiterEB) state.waiter else state
1143                         return if (sender.tryResumeSender(segment, index)) {
1144                             // The sender has been resumed successfully!
1145                             // Update the cell state correspondingly,
1146                             // expand the buffer, and return the element
1147                             // stored in the cell.
1148                             // In case a concurrent `expandBuffer()` has delegated
1149                             // its completion, the procedure should finish, as the
1150                             // sender is resumed. Thus, no further action is required.
1151                             segment.setState(index, DONE_RCV)
1152                             expandBuffer()
1153                             segment.retrieveElement(index)
1154                         } else {
1155                             // The resumption has failed. Update the cell correspondingly.
1156                             // In case a concurrent `expandBuffer()` has delegated
1157                             // its completion, the procedure should skip this cell, so
1158                             // `expandBuffer()` should be called once again.
1159                             segment.setState(index, INTERRUPTED_SEND)
1160                             segment.onCancelledRequest(index, false)
1161                             if (helpExpandBuffer) expandBuffer()
1162                             FAILED
1163                         }
1164                     }
1165                 }
1166             }
1167         }
1168     }
1169 
1170     private fun Any.tryResumeSender(segment: ChannelSegment<E>, index: Int): Boolean = when (this) {
1171         is CancellableContinuation<*> -> { // suspended `send(e)` operation
1172             @Suppress("UNCHECKED_CAST")
1173             this as CancellableContinuation<Unit>
1174             tryResume0(Unit)
1175         }
1176         is SelectInstance<*> -> {
1177             this as SelectImplementation<*>
1178             val trySelectResult = trySelectDetailed(clauseObject = this@BufferedChannel, result = Unit)
1179             // Clean the element slot to avoid memory leaks
1180             // if this `select` clause should be re-registered.
1181             if (trySelectResult === REREGISTER) segment.cleanElement(index)
1182             // Was the resumption successful?
1183             trySelectResult === SUCCESSFUL
1184         }
1185         is SendBroadcast -> cont.tryResume0(true) // // suspended `sendBroadcast(e)` operation
1186         else -> error("Unexpected waiter: $this")
1187     }
1188 
1189     // ################################
1190     // # The expandBuffer() procedure #
1191     // ################################
1192 
1193     private fun expandBuffer() {
1194         // Do not need to take any action if
1195         // this channel is rendezvous or unlimited.
1196         if (isRendezvousOrUnlimited) return
1197         // Read the current segment of
1198         // the `expandBuffer()` procedure.
1199         var segment = bufferEndSegment.value
1200         // Try to expand the buffer until succeed.
1201         try_again@ while (true) {
1202             // Increment the logical end of the buffer.
1203             // The `b`-th cell is going to be added to the buffer.
1204             val b = bufferEnd.getAndIncrement()
1205             val id = b / SEGMENT_SIZE
1206             // After that, read the current `senders` counter.
1207             // In case its value is lower than `b`, the `send(e)`
1208             // invocation that will work with this `b`-th cell
1209             // will detect that the cell is already a part of the
1210             // buffer when comparing with the `bufferEnd` counter.
1211             // However, `bufferEndSegment` may reference an outdated
1212             // segment, which should be updated to avoid memory leaks.
1213             val s = sendersCounter
1214             if (s <= b) {
1215                 // Should `bufferEndSegment` be moved forward to avoid memory leaks?
1216                 if (segment.id < id && segment.next != null)
1217                     moveSegmentBufferEndToSpecifiedOrLast(id, segment)
1218                 // Increment the number of completed `expandBuffer()`-s and finish.
1219                 incCompletedExpandBufferAttempts()
1220                 return
1221             }
1222             // Is `bufferEndSegment` outdated or is the segment with the required id already removed?
1223             // Find the required segment, creating new ones if needed.
1224             if (segment.id != id) {
1225                 segment = findSegmentBufferEnd(id, segment, b)
1226                     // Restart if the required segment is removed, or
1227                     // the linked list of segments is already closed,
1228                     // and the required one will never be created.
1229                     // Please note that `findSegmentBuffer(..)` updates
1230                     // the number of completed `expandBuffer()` attempt
1231                     // in this case.
1232                     ?: continue@try_again
1233             }
1234             // Try to add the cell to the logical buffer,
1235             // updating the cell state according to the state-machine.
1236             val i = (b % SEGMENT_SIZE).toInt()
1237             if (updateCellExpandBuffer(segment, i, b)) {
1238                 // The cell has been added to the logical buffer!
1239                 // Increment the number of completed `expandBuffer()`-s and finish.
1240                 //
1241                 // Note that it is possible to increment the number of
1242                 // completed `expandBuffer()` attempts earlier, right
1243                 // after the segment is obtained. We find this change
1244                 // counter-intuitive and prefer to avoid it.
1245                 incCompletedExpandBufferAttempts()
1246                 return
1247             } else {
1248                 // The cell has not been added to the buffer.
1249                 // Increment the number of completed `expandBuffer()`
1250                 // attempts and restart.
1251                 incCompletedExpandBufferAttempts()
1252                 continue@try_again
1253             }
1254         }
1255     }
1256 
1257     private fun updateCellExpandBuffer(
1258         /* The working cell is specified by
1259         the segment and the index in it. */
1260         segment: ChannelSegment<E>,
1261         index: Int,
1262         /* The global index of the cell. */
1263         b: Long
1264     ): Boolean {
1265         // This is a fast-path of `updateCellExpandBufferSlow(..)`.
1266         //
1267         // Read the current cell state.
1268         val state = segment.getState(index)
1269         if (state is Waiter) {
1270             // Usually, a sender is stored in the cell.
1271             // However, it is possible for a concurrent
1272             // receiver to be already suspended there.
1273             // Try to distinguish whether the waiter is a
1274             // sender by comparing the global cell index with
1275             // the `receivers` counter. In case the cell is not
1276             // covered by a receiver, a sender is stored in the cell.
1277             if (b >= receivers.value) {
1278                 // The cell stores a suspended sender. Try to resume it.
1279                 // To synchronize with a concurrent `receive()`, the algorithm
1280                 // first moves the cell state to an intermediate `RESUMING_BY_EB`
1281                 // state, updating it to either `BUFFERED` (on successful resumption)
1282                 // or `INTERRUPTED_SEND` (on failure).
1283                 if (segment.casState(index, state, RESUMING_BY_EB)) {
1284                     return if (state.tryResumeSender(segment, index)) {
1285                         // The sender has been resumed successfully!
1286                         // Move the cell to the logical buffer and finish.
1287                         segment.setState(index, BUFFERED)
1288                         true
1289                     } else {
1290                         // The resumption has failed.
1291                         segment.setState(index, INTERRUPTED_SEND)
1292                         segment.onCancelledRequest(index, false)
1293                         false
1294                     }
1295                 }
1296             }
1297         }
1298         return updateCellExpandBufferSlow(segment, index, b)
1299     }
1300 
1301     private fun updateCellExpandBufferSlow(
1302         /* The working cell is specified by
1303         the segment and the index in it. */
1304         segment: ChannelSegment<E>,
1305         index: Int,
1306         /* The global index of the cell. */
1307         b: Long
1308     ): Boolean {
1309         // Update the cell state according to its state machine.
1310         // See the paper mentioned in the very beginning for
1311         // the cell life-cycle and the algorithm details.
1312         while (true) {
1313             // Read the current cell state.
1314             val state = segment.getState(index)
1315             when {
1316                 // A suspended waiter, sender or receiver.
1317                 state is Waiter -> {
1318                     // Usually, a sender is stored in the cell.
1319                     // However, it is possible for a concurrent
1320                     // receiver to be already suspended there.
1321                     // Try to distinguish whether the waiter is a
1322                     // sender by comparing the global cell index with
1323                     // the `receivers` counter. In case the cell is not
1324                     // covered by a receiver, a sender is stored in the cell.
1325                     if (b < receivers.value) {
1326                         // The algorithm cannot distinguish whether the
1327                         // suspended in the cell operation is sender or receiver.
1328                         // To make progress, `expandBuffer()` delegates its completion
1329                         // to an upcoming pairwise request, atomically wrapping
1330                         // the waiter in `WaiterEB`. In case a sender is stored
1331                         // in the cell, the upcoming receiver will call `expandBuffer()`
1332                         // if the sender resumption fails; thus, effectively, skipping
1333                         // this cell. Otherwise, if a receiver is stored in the cell,
1334                         // this `expandBuffer()` procedure must finish; therefore,
1335                         // sender ignore the `WaiterEB` wrapper.
1336                         if (segment.casState(index, state, WaiterEB(waiter = state)))
1337                             return true
1338                     } else {
1339                         // The cell stores a suspended sender. Try to resume it.
1340                         // To synchronize with a concurrent `receive()`, the algorithm
1341                         // first moves the cell state to an intermediate `RESUMING_BY_EB`
1342                         // state, updating it to either `BUFFERED` (on successful resumption)
1343                         // or `INTERRUPTED_SEND` (on failure).
1344                         if (segment.casState(index, state, RESUMING_BY_EB)) {
1345                             return if (state.tryResumeSender(segment, index)) {
1346                                 // The sender has been resumed successfully!
1347                                 // Move the cell to the logical buffer and finish.
1348                                 segment.setState(index, BUFFERED)
1349                                 true
1350                             } else {
1351                                 // The resumption has failed.
1352                                 segment.setState(index, INTERRUPTED_SEND)
1353                                 segment.onCancelledRequest(index, false)
1354                                 false
1355                             }
1356                         }
1357                     }
1358                 }
1359                 // The cell stores an interrupted sender, skip it.
1360                 state === INTERRUPTED_SEND -> return false
1361                 // The cell is empty, a concurrent sender is coming.
1362                 state === null -> {
1363                     // To inform a concurrent sender that this cell is
1364                     // already a part of the buffer, the algorithm moves
1365                     // it to a special `IN_BUFFER` state.
1366                     if (segment.casState(index, state, IN_BUFFER)) return true
1367                 }
1368                 // The cell is already a part of the buffer, finish.
1369                 state === BUFFERED -> return true
1370                 // The cell is already processed by a receiver, no further action is required.
1371                 state === POISONED || state === DONE_RCV || state === INTERRUPTED_RCV -> return true
1372                 // The channel is closed, all the following
1373                 // cells are already in the same state, finish.
1374                 state === CHANNEL_CLOSED -> return true
1375                 // A concurrent receiver is resuming the suspended sender.
1376                 // Wait in a spin-loop until it changes the cell state
1377                 // to either `DONE_RCV` or `INTERRUPTED_SEND`.
1378                 state === RESUMING_BY_RCV -> continue // spin wait
1379                 else -> error("Unexpected cell state: $state")
1380             }
1381         }
1382     }
1383 
1384     /**
1385      * Increments the counter of completed [expandBuffer] invocations.
1386      * To guarantee starvation-freedom for [waitExpandBufferCompletion],
1387      * which waits until the counters of started and completed [expandBuffer] calls
1388      * coincide and become greater or equal to the specified value,
1389      * [waitExpandBufferCompletion] may set a flag that pauses further progress.
1390      */
1391     private fun incCompletedExpandBufferAttempts(nAttempts: Long = 1) {
1392         // Increment the number of completed `expandBuffer()` calls.
1393         completedExpandBuffersAndPauseFlag.addAndGet(nAttempts).also {
1394             // Should further `expandBuffer()`-s be paused?
1395             // If so, this thread should wait in a spin-loop
1396             // until the flag is unset.
1397             if (it.ebPauseExpandBuffers) {
1398                 @Suppress("ControlFlowWithEmptyBody")
1399                 while (completedExpandBuffersAndPauseFlag.value.ebPauseExpandBuffers) {}
1400             }
1401         }
1402     }
1403 
1404     /**
1405      * Waits in a spin-loop until the [expandBuffer] call that
1406      * should process the [globalIndex]-th cell is completed.
1407      * Essentially, it waits until the numbers of started ([bufferEnd])
1408      * and completed ([completedExpandBuffersAndPauseFlag]) [expandBuffer]
1409      * attempts coincide and become equal or greater than [globalIndex].
1410      * To avoid starvation, this function may set a flag
1411      * that pauses further progress.
1412      */
1413     internal fun waitExpandBufferCompletion(globalIndex: Long) {
1414         // Do nothing if this channel is rendezvous or unlimited;
1415         // `expandBuffer()` is not used in these cases.
1416         if (isRendezvousOrUnlimited) return
1417         // Wait in an infinite loop until the number of started
1418         // buffer expansion calls become not lower than the cell index.
1419         @Suppress("ControlFlowWithEmptyBody")
1420         while (bufferEndCounter <= globalIndex) {}
1421         // Now it is guaranteed that the `expandBuffer()` call that
1422         // should process the required cell has been started.
1423         // Wait in a fixed-size spin-loop until the numbers of
1424         // started and completed buffer expansion calls coincide.
1425         repeat(EXPAND_BUFFER_COMPLETION_WAIT_ITERATIONS) {
1426             // Read the number of started buffer expansion calls.
1427             val b = bufferEndCounter
1428             // Read the number of completed buffer expansion calls.
1429             val ebCompleted = completedExpandBuffersAndPauseFlag.value.ebCompletedCounter
1430             // Do the numbers of started and completed calls coincide?
1431             // Note that we need to re-read the number of started `expandBuffer()`
1432             // calls to obtain a correct snapshot.
1433             // Here we wait to a precise match in order to ensure that **our matching expandBuffer()**
1434             // completed. The only way to ensure that is to check that number of started expands == number of finished expands
1435             if (b == ebCompleted && b == bufferEndCounter) return
1436         }
1437         // To avoid starvation, pause further `expandBuffer()` calls.
1438         completedExpandBuffersAndPauseFlag.update {
1439             constructEBCompletedAndPauseFlag(it.ebCompletedCounter, true)
1440         }
1441         // Now wait in an infinite spin-loop until the counters coincide.
1442         while (true) {
1443             // Read the number of started buffer expansion calls.
1444             val b = bufferEndCounter
1445             // Read the number of completed buffer expansion calls
1446             // along with the flag that pauses further progress.
1447             val ebCompletedAndBit = completedExpandBuffersAndPauseFlag.value
1448             val ebCompleted = ebCompletedAndBit.ebCompletedCounter
1449             val pauseExpandBuffers = ebCompletedAndBit.ebPauseExpandBuffers
1450             // Do the numbers of started and completed calls coincide?
1451             // Note that we need to re-read the number of started `expandBuffer()`
1452             // calls to obtain a correct snapshot.
1453             if (b == ebCompleted && b == bufferEndCounter) {
1454                 // Unset the flag, which pauses progress, and finish.
1455                 completedExpandBuffersAndPauseFlag.update {
1456                     constructEBCompletedAndPauseFlag(it.ebCompletedCounter, false)
1457                 }
1458                 return
1459             }
1460             // It is possible that a concurrent caller of this function
1461             // has unset the flag, which pauses further progress to avoid
1462             // starvation. In this case, set the flag back.
1463             if (!pauseExpandBuffers) {
1464                 completedExpandBuffersAndPauseFlag.compareAndSet(
1465                     ebCompletedAndBit,
1466                     constructEBCompletedAndPauseFlag(ebCompleted, true)
1467                 )
1468             }
1469         }
1470     }
1471 
1472 
1473     // #######################
1474     // ## Select Expression ##
1475     // #######################
1476 
1477     @Suppress("UNCHECKED_CAST")
1478     override val onSend: SelectClause2<E, BufferedChannel<E>>
1479         get() = SelectClause2Impl(
1480             clauseObject = this@BufferedChannel,
1481             regFunc = BufferedChannel<*>::registerSelectForSend as RegistrationFunction,
1482             processResFunc = BufferedChannel<*>::processResultSelectSend as ProcessResultFunction
1483         )
1484 
1485     @Suppress("UNCHECKED_CAST")
1486     protected open fun registerSelectForSend(select: SelectInstance<*>, element: Any?) =
1487         sendImpl( // <-- this is an inline function
1488             element = element as E,
1489             waiter = select,
1490             onRendezvousOrBuffered = { select.selectInRegistrationPhase(Unit) },
1491             onSuspend = { _, _ -> },
1492             onClosed = { onClosedSelectOnSend(element, select) }
1493         )
1494 
1495 
1496     private fun onClosedSelectOnSend(element: E, select: SelectInstance<*>) {
1497         onUndeliveredElement?.callUndeliveredElement(element, select.context)
1498         select.selectInRegistrationPhase(CHANNEL_CLOSED)
1499     }
1500 
1501     @Suppress("UNUSED_PARAMETER", "RedundantNullableReturnType")
1502     private fun processResultSelectSend(ignoredParam: Any?, selectResult: Any?): Any? =
1503         if (selectResult === CHANNEL_CLOSED) throw sendException
1504         else this
1505 
1506     @Suppress("UNCHECKED_CAST")
1507     override val onReceive: SelectClause1<E>
1508         get() = SelectClause1Impl(
1509             clauseObject = this@BufferedChannel,
1510             regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1511             processResFunc = BufferedChannel<*>::processResultSelectReceive as ProcessResultFunction,
1512             onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1513         )
1514 
1515     @Suppress("UNCHECKED_CAST")
1516     override val onReceiveCatching: SelectClause1<ChannelResult<E>>
1517         get() = SelectClause1Impl(
1518             clauseObject = this@BufferedChannel,
1519             regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1520             processResFunc = BufferedChannel<*>::processResultSelectReceiveCatching as ProcessResultFunction,
1521             onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1522         )
1523 
1524     @Suppress("OVERRIDE_DEPRECATION", "UNCHECKED_CAST")
1525     override val onReceiveOrNull: SelectClause1<E?>
1526         get() = SelectClause1Impl(
1527             clauseObject = this@BufferedChannel,
1528             regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1529             processResFunc = BufferedChannel<*>::processResultSelectReceiveOrNull as ProcessResultFunction,
1530             onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1531         )
1532 
1533     @Suppress("UNUSED_PARAMETER")
1534     private fun registerSelectForReceive(select: SelectInstance<*>, ignoredParam: Any?) =
1535         receiveImpl( // <-- this is an inline function
1536             waiter = select,
1537             onElementRetrieved = { elem -> select.selectInRegistrationPhase(elem) },
1538             onSuspend = { _, _, _ -> },
1539             onClosed = { onClosedSelectOnReceive(select) }
1540         )
1541 
1542     private fun onClosedSelectOnReceive(select: SelectInstance<*>) {
1543         select.selectInRegistrationPhase(CHANNEL_CLOSED)
1544     }
1545 
1546     @Suppress("UNUSED_PARAMETER")
1547     private fun processResultSelectReceive(ignoredParam: Any?, selectResult: Any?): Any? =
1548         if (selectResult === CHANNEL_CLOSED) throw receiveException
1549         else selectResult
1550 
1551     @Suppress("UNUSED_PARAMETER")
1552     private fun processResultSelectReceiveOrNull(ignoredParam: Any?, selectResult: Any?): Any? =
1553         if (selectResult === CHANNEL_CLOSED) {
1554             if (closeCause == null) null
1555             else throw receiveException
1556         } else selectResult
1557 
1558     @Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER", "RedundantNullableReturnType")
1559     private fun processResultSelectReceiveCatching(ignoredParam: Any?, selectResult: Any?): Any? =
1560         if (selectResult === CHANNEL_CLOSED) closed(closeCause)
1561         else success(selectResult as E)
1562 
1563     @Suppress("UNCHECKED_CAST")
1564     private val onUndeliveredElementReceiveCancellationConstructor: OnCancellationConstructor? = onUndeliveredElement?.let {
1565         { select: SelectInstance<*>, _: Any?, element: Any? ->
1566             { if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(element as E, select.context) }
1567         }
1568     }
1569 
1570     // ######################
1571     // ## Iterator Support ##
1572     // ######################
1573 
1574     override fun iterator(): ChannelIterator<E> = BufferedChannelIterator()
1575 
1576     /**
1577      * The key idea is that an iterator is a special receiver type,
1578      * which should be resumed differently to [receive] and [onReceive]
1579      * operations, but can be served as a waiter in a way similar to
1580      * [CancellableContinuation] and [SelectInstance].
1581      *
1582      * Roughly, [hasNext] is a [receive] sibling, while [next] simply
1583      * returns the already retrieved element. From the implementation
1584      * side, [receiveResult] stores the element retrieved by [hasNext]
1585      * (or a special [CHANNEL_CLOSED] token if the channel is closed).
1586      *
1587      * The [invoke] function is a [CancelHandler] implementation,
1588      * which requires knowing the [segment] and the [index] in it
1589      * that specify the location of the stored iterator.
1590      *
1591      * To resume the suspended [hasNext] call, a special [tryResumeHasNext]
1592      * function should be used in a way similar to [CancellableContinuation.tryResume]
1593      * and [SelectInstance.trySelect]. When the channel becomes closed,
1594      * [tryResumeHasNextOnClosedChannel] should be used instead.
1595      */
1596     private inner class BufferedChannelIterator : ChannelIterator<E>, Waiter {
1597         /**
1598          * Stores the element retrieved by [hasNext] or
1599          * a special [CHANNEL_CLOSED] token if this channel is closed.
1600          * If [hasNext] has not been invoked yet, [NO_RECEIVE_RESULT] is stored.
1601          */
1602         private var receiveResult: Any? = NO_RECEIVE_RESULT
1603 
1604         /**
1605          * When [hasNext] suspends, this field stores the corresponding
1606          * continuation. The [tryResumeHasNext] and [tryResumeHasNextOnClosedChannel]
1607          * function resume this continuation when the [hasNext] invocation should complete.
1608          *
1609          * This property is the subject to bening data race:
1610          * It is nulled-out on both completion and cancellation paths that
1611          * could happen concurrently.
1612          */
1613         @BenignDataRace
1614         private var continuation: CancellableContinuationImpl<Boolean>? = null
1615 
1616         // `hasNext()` is just a special receive operation.
1617         override suspend fun hasNext(): Boolean =
1618             receiveImpl( // <-- this is an inline function
1619                 // Do not create a continuation until it is required;
1620                 // it is created later via [onNoWaiterSuspend], if needed.
1621                 waiter = null,
1622                 // Store the received element in `receiveResult` on successful
1623                 // retrieval from the buffer or rendezvous with a suspended sender.
1624                 // Also, inform the `BufferedChannel` extensions that
1625                 // the synchronization of this receive operation is completed.
1626                 onElementRetrieved = { element ->
1627                     this.receiveResult = element
1628                     true
1629                 },
1630                 // As no waiter is provided, suspension is impossible.
1631                 onSuspend = { _, _, _ -> error("unreachable") },
1632                 // Return `false` or throw an exception if the channel is already closed.
1633                 onClosed = { onClosedHasNext() },
1634                 // If `hasNext()` decides to suspend, the corresponding
1635                 // `suspend` function that creates a continuation is called.
1636                 // The tail-call optimization is applied here.
1637                 onNoWaiterSuspend = { segm, i, r -> return hasNextOnNoWaiterSuspend(segm, i, r) }
1638             )
1639 
1640         private fun onClosedHasNext(): Boolean {
1641             this.receiveResult = CHANNEL_CLOSED
1642             val cause = closeCause ?: return false
1643             throw recoverStackTrace(cause)
1644         }
1645 
1646         private suspend fun hasNextOnNoWaiterSuspend(
1647             /* The working cell is specified by
1648             the segment and the index in it. */
1649             segment: ChannelSegment<E>,
1650             index: Int,
1651             /* The global index of the cell. */
1652             r: Long
1653         ): Boolean = suspendCancellableCoroutineReusable { cont ->
1654             this.continuation = cont
1655             receiveImplOnNoWaiter( // <-- this is an inline function
1656                 segment = segment, index = index, r = r,
1657                 waiter = this, // store this iterator as a waiter
1658                 // In case of successful element retrieval, store
1659                 // it in `receiveResult` and resume the continuation.
1660                 // Importantly, the receiver coroutine may be cancelled
1661                 // after it is successfully resumed but not dispatched yet.
1662                 // In case `onUndeliveredElement` is present, we must
1663                 // invoke it in the latter case.
1664                 onElementRetrieved = { element ->
1665                     this.receiveResult = element
1666                     this.continuation = null
1667                     cont.resume(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
1668                 },
1669                 onClosed = { onClosedHasNextNoWaiterSuspend() }
1670             )
1671         }
1672 
1673         override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
1674             this.continuation?.invokeOnCancellation(segment, index)
1675         }
1676 
1677         private fun onClosedHasNextNoWaiterSuspend() {
1678             // Read the current continuation and clean
1679             // the corresponding field to avoid memory leaks.
1680             val cont = this.continuation!!
1681             this.continuation = null
1682             // Update the `hasNext()` internal result.
1683             this.receiveResult = CHANNEL_CLOSED
1684             // If this channel was closed without exception,
1685             // `hasNext()` should return `false`; otherwise,
1686             // it throws the closing exception.
1687             val cause = closeCause
1688             if (cause == null) {
1689                 cont.resume(false)
1690             } else {
1691                 cont.resumeWithException(recoverStackTrace(cause, cont))
1692             }
1693         }
1694 
1695         @Suppress("UNCHECKED_CAST")
1696         override fun next(): E {
1697             // Read the already received result, or [NO_RECEIVE_RESULT] if [hasNext] has not been invoked yet.
1698             val result = receiveResult
1699             check(result !== NO_RECEIVE_RESULT) { "`hasNext()` has not been invoked" }
1700             receiveResult = NO_RECEIVE_RESULT
1701             // Is this channel closed?
1702             if (result === CHANNEL_CLOSED) throw recoverStackTrace(receiveException)
1703             // Return the element.
1704             return result as E
1705         }
1706 
1707         fun tryResumeHasNext(element: E): Boolean {
1708             // Read the current continuation and clean
1709             // the corresponding field to avoid memory leaks.
1710             val cont = this.continuation!!
1711             this.continuation = null
1712             // Store the retrieved element in `receiveResult`.
1713             this.receiveResult = element
1714             // Try to resume this `hasNext()`. Importantly, the receiver coroutine
1715             // may be cancelled after it is successfully resumed but not dispatched yet.
1716             // In case `onUndeliveredElement` is specified, we need to invoke it in the latter case.
1717             return cont.tryResume0(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
1718         }
1719 
1720         fun tryResumeHasNextOnClosedChannel() {
1721             /*
1722              * Read the current continuation of the suspended `hasNext()` call and clean the corresponding field to avoid memory leaks.
1723              * While this nulling out is unnecessary, it eliminates memory leaks (through the continuation)
1724              * if the channel iterator accidentally remains GC-reachable after the channel is closed.
1725              */
1726             val cont = this.continuation!!
1727             this.continuation = null
1728             // Update the `hasNext()` internal result and inform
1729             // `BufferedChannel` extensions that synchronization
1730             // of this receive operation is completed.
1731             this.receiveResult = CHANNEL_CLOSED
1732             // If this channel was closed without exception,
1733             // `hasNext()` should return `false`; otherwise,
1734             // it throws the closing exception.
1735             val cause = closeCause
1736             if (cause == null) {
1737                 cont.resume(false)
1738             } else {
1739                 cont.resumeWithException(recoverStackTrace(cause, cont))
1740             }
1741         }
1742     }
1743 
1744     // ##############################
1745     // ## Closing and Cancellation ##
1746     // ##############################
1747 
1748     /**
1749      * Store the cause of closing this channel, either via [close] or [cancel] call.
1750      * The closing cause can be set only once.
1751      */
1752     private val _closeCause = atomic<Any?>(NO_CLOSE_CAUSE)
1753     // Should be called only if this channel is closed or cancelled.
1754     protected val closeCause get() = _closeCause.value as Throwable?
1755 
1756     /** Returns the closing cause if it is non-null, or [ClosedSendChannelException] otherwise. */
1757     protected val sendException get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
1758 
1759     /** Returns the closing cause if it is non-null, or [ClosedReceiveChannelException] otherwise. */
1760     private val receiveException get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1761 
1762     /**
1763       Stores the closed handler installed by [invokeOnClose].
1764       To synchronize [invokeOnClose] and [close], two additional
1765       marker states, [CLOSE_HANDLER_INVOKED] and [CLOSE_HANDLER_CLOSED]
1766       are used. The resulting state diagram is presented below.
1767 
1768       +------+  install handler  +---------+  close(..)  +---------+
1769       | null |------------------>| handler |------------>| INVOKED |
1770       +------+                   +---------+             +---------+
1771          |
1772          | close(..)  +--------+
1773          +----------->| CLOSED |
1774                       +--------+
1775      */
1776     private val closeHandler = atomic<Any?>(null)
1777 
1778     /**
1779      * Invoked when channel is closed as the last action of [close] invocation.
1780      * This method should be idempotent and can be called multiple times.
1781      */
1782     protected open fun onClosedIdempotent() {}
1783 
1784     override fun close(cause: Throwable?): Boolean =
1785         closeOrCancelImpl(cause, cancel = false)
1786 
1787     @Suppress("OVERRIDE_DEPRECATION")
1788     final override fun cancel(cause: Throwable?): Boolean = cancelImpl(cause)
1789 
1790     @Suppress("OVERRIDE_DEPRECATION")
1791     final override fun cancel() { cancelImpl(null) }
1792 
1793     final override fun cancel(cause: CancellationException?) { cancelImpl(cause) }
1794 
1795     internal open fun cancelImpl(cause: Throwable?): Boolean =
1796         closeOrCancelImpl(cause ?: CancellationException("Channel was cancelled"), cancel = true)
1797 
1798     /**
1799      * This is a common implementation for [close] and [cancel]. It first tries
1800      * to install the specified cause; the invocation that successfully installs
1801      * the cause returns `true` as a results of this function, while all further
1802      * [close] and [cancel] calls return `false`.
1803      *
1804      * After the closing/cancellation cause is installed, the channel should be marked
1805      * as closed or cancelled, which bounds further `send(e)`-s to fails.
1806      *
1807      * Then, [completeCloseOrCancel] is called, which cancels waiting `receive()`
1808      * requests ([cancelSuspendedReceiveRequests]) and removes unprocessed elements
1809      * ([removeUnprocessedElements]) in case this channel is cancelled.
1810      *
1811      * Finally, if this [closeOrCancelImpl] has installed the cause, therefore,
1812      * has closed the channel, [closeHandler] and [onClosedIdempotent] should be invoked.
1813      */
1814     protected open fun closeOrCancelImpl(cause: Throwable?, cancel: Boolean): Boolean {
1815         // If this is a `cancel(..)` invocation, set a bit that the cancellation
1816         // has been started. This is crucial for ensuring linearizability,
1817         // when concurrent `close(..)` and `isClosedFor[Send,Receive]` operations
1818         // help this `cancel(..)`.
1819         if (cancel) markCancellationStarted()
1820         // Try to install the specified cause. On success, this invocation will
1821         // return `true` as a result; otherwise, it will complete with `false`.
1822         val closedByThisOperation = _closeCause.compareAndSet(NO_CLOSE_CAUSE, cause)
1823         // Mark this channel as closed or cancelled, depending on this operation type.
1824         if (cancel) markCancelled() else markClosed()
1825         // Complete the closing or cancellation procedure.
1826         completeCloseOrCancel()
1827         // Finally, if this operation has installed the cause,
1828         // it should invoke the close handlers.
1829         return closedByThisOperation.also {
1830             onClosedIdempotent()
1831             if (it) invokeCloseHandler()
1832         }
1833     }
1834 
1835     /**
1836      * Invokes the installed close handler,
1837      * updating the [closeHandler] state correspondingly.
1838      */
1839     private fun invokeCloseHandler() {
1840         val closeHandler = closeHandler.getAndUpdate {
1841             if (it === null) {
1842                 // Inform concurrent `invokeOnClose`
1843                 // that this channel is already closed.
1844                 CLOSE_HANDLER_CLOSED
1845             } else {
1846                 // Replace the handler with a special
1847                 // `INVOKED` marker to avoid memory leaks.
1848                 CLOSE_HANDLER_INVOKED
1849             }
1850         } ?: return // no handler was installed, finish.
1851         // Invoke the handler.
1852         @Suppress("UNCHECKED_CAST")
1853         closeHandler as (cause: Throwable?) -> Unit
1854         closeHandler(closeCause)
1855     }
1856 
1857     override fun invokeOnClose(handler: (cause: Throwable?) -> Unit) {
1858         // Try to install the handler, finishing on success.
1859         if (closeHandler.compareAndSet(null, handler)) {
1860             // Handler has been successfully set, finish the operation.
1861             return
1862         }
1863         // Either another handler is already set, or this channel is closed.
1864         // In the latter case, the current handler should be invoked.
1865         // However, the implementation must ensure that at most one
1866         // handler is called, throwing an `IllegalStateException`
1867         // if another close handler has been invoked.
1868         closeHandler.loop { cur ->
1869             when {
1870                 cur === CLOSE_HANDLER_CLOSED -> {
1871                     // Try to update the state from `CLOSED` to `INVOKED`.
1872                     // This is crucial to guarantee that at most one handler can be called.
1873                     // On success, invoke the handler and finish.
1874                     if (closeHandler.compareAndSet(CLOSE_HANDLER_CLOSED, CLOSE_HANDLER_INVOKED)) {
1875                         handler(closeCause)
1876                         return
1877                     }
1878                 }
1879                 cur === CLOSE_HANDLER_INVOKED -> error("Another handler was already registered and successfully invoked")
1880                 else -> error("Another handler is already registered: $cur")
1881             }
1882         }
1883     }
1884 
1885     /**
1886      * Marks this channel as closed.
1887      * In case [cancelImpl] has already been invoked,
1888      * and this channel is marked with [CLOSE_STATUS_CANCELLATION_STARTED],
1889      * this function marks the channel as cancelled.
1890      *
1891      * All operation that notice this channel in the closed state,
1892      * must help to complete the closing via [completeCloseOrCancel].
1893      */
1894     private fun markClosed(): Unit =
1895         sendersAndCloseStatus.update { cur ->
1896             when (cur.sendersCloseStatus) {
1897                 CLOSE_STATUS_ACTIVE -> // the channel is neither closed nor cancelled
1898                     constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CLOSED)
1899                 CLOSE_STATUS_CANCELLATION_STARTED -> // the channel is going to be cancelled
1900                     constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLED)
1901                 else -> return // the channel is already marked as closed or cancelled.
1902             }
1903         }
1904 
1905     /**
1906      * Marks this channel as cancelled.
1907      *
1908      * All operation that notice this channel in the cancelled state,
1909      * must help to complete the cancellation via [completeCloseOrCancel].
1910      */
1911     private fun markCancelled(): Unit =
1912         sendersAndCloseStatus.update { cur ->
1913             constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLED)
1914         }
1915 
1916     /**
1917      * When the cancellation procedure starts, it is critical
1918      * to mark the closing status correspondingly. Thus, other
1919      * operations, which may help to complete the cancellation,
1920      * always correctly update the status to `CANCELLED`.
1921      */
1922     private fun markCancellationStarted(): Unit =
1923         sendersAndCloseStatus.update { cur ->
1924             if (cur.sendersCloseStatus == CLOSE_STATUS_ACTIVE)
1925                 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLATION_STARTED)
1926             else return // this channel is already closed or cancelled
1927         }
1928 
1929     /**
1930      * Completes the started [close] or [cancel] procedure.
1931      */
1932     private fun completeCloseOrCancel() {
1933         isClosedForSend // must finish the started close/cancel if one is detected.
1934     }
1935 
1936     protected open val isConflatedDropOldest get() = false
1937 
1938     /**
1939      * Completes the channel closing procedure.
1940      */
1941     private fun completeClose(sendersCur: Long): ChannelSegment<E> {
1942         // Close the linked list for further segment addition,
1943         // obtaining the last segment in the data structure.
1944         val lastSegment = closeLinkedList()
1945         // In the conflated channel implementation (with the DROP_OLDEST
1946         // elements conflation strategy), it is critical to mark all empty
1947         // cells as closed to prevent in-progress `send(e)`-s, which have not
1948         // put their elements yet, completions after this channel is closed.
1949         // Otherwise, it is possible for a `send(e)` to put an element when
1950         // the buffer is already full, while a concurrent receiver may extract
1951         // the oldest element. When the channel is not closed, we can linearize
1952         // this `receive()` before the `send(e)`, but after the channel is closed,
1953         // `send(e)` must fails. Marking all unprocessed cells as `CLOSED` solves the issue.
1954         if (isConflatedDropOldest) {
1955             val lastBufferedCellGlobalIndex = markAllEmptyCellsAsClosed(lastSegment)
1956             if (lastBufferedCellGlobalIndex != -1L)
1957                 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(lastBufferedCellGlobalIndex)
1958         }
1959         // Resume waiting `receive()` requests,
1960         // informing them that the channel is closed.
1961         cancelSuspendedReceiveRequests(lastSegment, sendersCur)
1962         // Return the last segment in the linked list as a result
1963         // of this function; we need it in `completeCancel(..)`.
1964         return lastSegment
1965     }
1966 
1967     /**
1968      * Completes the channel cancellation procedure.
1969      */
1970     private fun completeCancel(sendersCur: Long) {
1971         // First, ensure that this channel is closed,
1972         // obtaining the last segment in the linked list.
1973         val lastSegment = completeClose(sendersCur)
1974         // Cancel suspended `send(e)` requests and
1975         // remove buffered elements in the reverse order.
1976         removeUnprocessedElements(lastSegment)
1977     }
1978 
1979     /**
1980      * Closes the underlying linked list of segments for further segment addition.
1981      */
1982     private fun closeLinkedList(): ChannelSegment<E> {
1983         // Choose the last segment.
1984         var lastSegment = bufferEndSegment.value
1985         sendSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
1986         receiveSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
1987         // Close the linked list of segment for new segment addition
1988         // and return the last segment in the linked list.
1989         return lastSegment.close()
1990     }
1991 
1992     /**
1993      * This function marks all empty cells, in the `null` and [IN_BUFFER] state,
1994      * as closed. Notably, it processes the cells from right to left, and finishes
1995      * immediately when the processing cell is already covered by `receive()` or
1996      * contains a buffered elements ([BUFFERED] state).
1997      *
1998      * This function returns the global index of the last buffered element,
1999      * or `-1` if this channel does not contain buffered elements.
2000      */
2001     private fun markAllEmptyCellsAsClosed(lastSegment: ChannelSegment<E>): Long {
2002         // Process the cells in reverse order, from right to left.
2003         var segment = lastSegment
2004         while (true) {
2005             for (index in SEGMENT_SIZE - 1 downTo 0) {
2006                 // Is this cell already covered by `receive()`?
2007                 val globalIndex = segment.id * SEGMENT_SIZE + index
2008                 if (globalIndex < receiversCounter) return -1
2009                 // Process the cell `segment[index]`.
2010                 cell_update@ while (true) {
2011                     val state = segment.getState(index)
2012                     when {
2013                         // The cell is empty.
2014                         state === null || state === IN_BUFFER -> {
2015                             // Inform a possibly upcoming sender that this channel is already closed.
2016                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2017                                 segment.onSlotCleaned()
2018                                 break@cell_update
2019                             }
2020                         }
2021                         // The cell stores a buffered element.
2022                         state === BUFFERED -> return globalIndex
2023                         // Skip this cell if it is not empty and does not store a buffered element.
2024                         else -> break@cell_update
2025                     }
2026                 }
2027             }
2028             // Process the next segment, finishing if the linked list ends.
2029             segment = segment.prev ?: return -1
2030         }
2031     }
2032 
2033     /**
2034      * Cancels suspended `send(e)` requests and removes buffered elements
2035      * starting from the last cell in the specified [lastSegment] (it must
2036      * be the physical tail of the underlying linked list) and updating
2037      * the cells in reverse order.
2038      */
2039     private fun removeUnprocessedElements(lastSegment: ChannelSegment<E>) {
2040         // Read the `onUndeliveredElement` lambda at once. In case it
2041         // throws an exception, this exception is handled and stored in
2042         // the variable below. If multiple exceptions are thrown, the first
2043         // one is stored in the variable, while the others are suppressed.
2044         val onUndeliveredElement = onUndeliveredElement
2045         var undeliveredElementException: UndeliveredElementException? = null // first cancel exception, others suppressed
2046         // To perform synchronization correctly, it is critical to
2047         // process the cells in reverse order, from right to left.
2048         // However, according to the API, suspended senders should
2049         // be cancelled in the order of their suspension. Therefore,
2050         // we need to collect all of them and cancel in the reverse
2051         // order after that.
2052         var suspendedSenders = InlineList<Waiter>()
2053         var segment = lastSegment
2054         process_segments@ while (true) {
2055             for (index in SEGMENT_SIZE - 1 downTo 0) {
2056                 // Process the cell `segment[index]`.
2057                 val globalIndex = segment.id * SEGMENT_SIZE + index
2058                 // Update the cell state.
2059                 update_cell@ while (true) {
2060                     // Read the current state of the cell.
2061                     val state = segment.getState(index)
2062                     when {
2063                         // The cell is already processed by a receiver.
2064                         state === DONE_RCV -> break@process_segments
2065                         // The cell stores a buffered element.
2066                         state === BUFFERED -> {
2067                             // Is the cell already covered by a receiver?
2068                             if (globalIndex < receiversCounter) break@process_segments
2069                             // Update the cell state to `CHANNEL_CLOSED`.
2070                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2071                                 // If `onUndeliveredElement` lambda is non-null, call it.
2072                                 if (onUndeliveredElement != null) {
2073                                     val element = segment.getElement(index)
2074                                     undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(element, undeliveredElementException)
2075                                 }
2076                                 // Clean the element field and inform the segment
2077                                 // that the slot is cleaned to avoid memory leaks.
2078                                 segment.cleanElement(index)
2079                                 segment.onSlotCleaned()
2080                                 break@update_cell
2081                             }
2082                         }
2083                         // The cell is empty.
2084                         state === IN_BUFFER || state === null -> {
2085                             // Update the cell state to `CHANNEL_CLOSED`.
2086                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2087                                 // Inform the segment that the slot is cleaned to avoid memory leaks.
2088                                 segment.onSlotCleaned()
2089                                 break@update_cell
2090                             }
2091                         }
2092                         // The cell stores a suspended waiter.
2093                         state is Waiter || state is WaiterEB -> {
2094                             // Is the cell already covered by a receiver?
2095                             if (globalIndex < receiversCounter) break@process_segments
2096                             // Obtain the sender.
2097                             val sender: Waiter = if (state is WaiterEB) state.waiter
2098                                                  else state as Waiter
2099                             // Update the cell state to `CHANNEL_CLOSED`.
2100                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2101                                 // If `onUndeliveredElement` lambda is non-null, call it.
2102                                 if (onUndeliveredElement != null) {
2103                                     val element = segment.getElement(index)
2104                                     undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(element, undeliveredElementException)
2105                                 }
2106                                 // Save the sender for further cancellation.
2107                                 suspendedSenders += sender
2108                                 // Clean the element field and inform the segment
2109                                 // that the slot is cleaned to avoid memory leaks.
2110                                 segment.cleanElement(index)
2111                                 segment.onSlotCleaned()
2112                                 break@update_cell
2113                             }
2114                         }
2115                         // A concurrent receiver is resuming a suspended sender.
2116                         // As the cell is covered by a receiver, finish immediately.
2117                         state === RESUMING_BY_EB || state === RESUMING_BY_RCV -> break@process_segments
2118                         // A concurrent `expandBuffer()` is resuming a suspended sender.
2119                         // Wait in a spin-loop until the cell state changes.
2120                         state === RESUMING_BY_EB -> continue@update_cell
2121                         else -> break@update_cell
2122                     }
2123                 }
2124             }
2125             // Process the previous segment.
2126             segment = segment.prev ?: break
2127         }
2128         // Cancel suspended senders in their order of addition to this channel.
2129         suspendedSenders.forEachReversed { it.resumeSenderOnCancelledChannel() }
2130         // Throw `UndeliveredElementException` at the end if there was one.
2131         undeliveredElementException?.let { throw it }
2132     }
2133 
2134     /**
2135      * Cancels suspended `receive` requests from the end to the beginning,
2136      * also moving empty cells to the `CHANNEL_CLOSED` state.
2137      */
2138     private fun cancelSuspendedReceiveRequests(lastSegment: ChannelSegment<E>, sendersCounter: Long) {
2139         // To perform synchronization correctly, it is critical to
2140         // extract suspended requests in the reverse order,
2141         // from the end to the beginning.
2142         // However, according to the API, they should be cancelled
2143         // in the order of their suspension. Therefore, we need to
2144         // collect the suspended requests first, cancelling them
2145         // in the reverse order after that.
2146         var suspendedReceivers = InlineList<Waiter>()
2147         var segment: ChannelSegment<E>? = lastSegment
2148         process_segments@ while (segment != null) {
2149             for (index in SEGMENT_SIZE - 1 downTo 0) {
2150                 // Is the cell already covered by a sender? Finish immediately in this case.
2151                 if (segment.id * SEGMENT_SIZE + index < sendersCounter) break@process_segments
2152                 // Try to move the cell state to `CHANNEL_CLOSED`.
2153                 cell_update@ while (true) {
2154                     val state = segment.getState(index)
2155                     when {
2156                         state === null || state === IN_BUFFER -> {
2157                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2158                                 segment.onSlotCleaned()
2159                                 break@cell_update
2160                             }
2161                         }
2162                         state is WaiterEB -> {
2163                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2164                                 suspendedReceivers += state.waiter // save for cancellation.
2165                                 segment.onCancelledRequest(index = index, receiver = true)
2166                                 break@cell_update
2167                             }
2168                         }
2169                         state is Waiter -> {
2170                             if (segment.casState(index, state, CHANNEL_CLOSED)) {
2171                                 suspendedReceivers += state // save for cancellation.
2172                                 segment.onCancelledRequest(index = index, receiver = true)
2173                                 break@cell_update
2174                             }
2175                         }
2176                         else -> break@cell_update // nothing to cancel.
2177                     }
2178                 }
2179             }
2180             // Process the previous segment.
2181             segment = segment.prev
2182         }
2183         // Cancel the suspended requests in their order of addition to this channel.
2184         suspendedReceivers.forEachReversed { it.resumeReceiverOnClosedChannel() }
2185     }
2186 
2187     /**
2188      * Resumes this receiver because this channel is closed.
2189      * This function does not take any effect if the operation has already been resumed or cancelled.
2190      */
2191     private fun Waiter.resumeReceiverOnClosedChannel() = resumeWaiterOnClosedChannel(receiver = true)
2192 
2193     /**
2194      * Resumes this sender because this channel is cancelled.
2195      * This function does not take any effect if the operation has already been resumed or cancelled.
2196      */
2197     private fun Waiter.resumeSenderOnCancelledChannel() = resumeWaiterOnClosedChannel(receiver = false)
2198 
2199     private fun Waiter.resumeWaiterOnClosedChannel(receiver: Boolean) {
2200         when (this) {
2201             is SendBroadcast -> cont.resume(false)
2202             is CancellableContinuation<*> -> resumeWithException(if (receiver) receiveException else sendException)
2203             is ReceiveCatching<*> -> cont.resume(closed(closeCause))
2204             is BufferedChannel<*>.BufferedChannelIterator -> tryResumeHasNextOnClosedChannel()
2205             is SelectInstance<*> -> trySelect(this@BufferedChannel, CHANNEL_CLOSED)
2206             else -> error("Unexpected waiter: $this")
2207         }
2208     }
2209 
2210     @ExperimentalCoroutinesApi
2211     override val isClosedForSend: Boolean
2212         get() = sendersAndCloseStatus.value.isClosedForSend0
2213 
2214     private val Long.isClosedForSend0 get() =
2215         isClosed(this, isClosedForReceive = false)
2216 
2217     @ExperimentalCoroutinesApi
2218     override val isClosedForReceive: Boolean
2219         get() = sendersAndCloseStatus.value.isClosedForReceive0
2220 
2221     private val Long.isClosedForReceive0 get() =
2222         isClosed(this, isClosedForReceive = true)
2223 
2224     private fun isClosed(
2225         sendersAndCloseStatusCur: Long,
2226         isClosedForReceive: Boolean
2227     ) = when (sendersAndCloseStatusCur.sendersCloseStatus) {
2228         // This channel is active and has not been closed.
2229         CLOSE_STATUS_ACTIVE -> false
2230         // The cancellation procedure has been started but
2231         // not linearized yet, so this channel should be
2232         // considered as active.
2233         CLOSE_STATUS_CANCELLATION_STARTED -> false
2234         // This channel has been successfully closed.
2235         // Help to complete the closing procedure to
2236         // guarantee linearizability, and return `true`
2237         // for senders or the flag whether there still
2238         // exist elements to retrieve for receivers.
2239         CLOSE_STATUS_CLOSED -> {
2240             completeClose(sendersAndCloseStatusCur.sendersCounter)
2241             // When `isClosedForReceive` is `false`, always return `true`.
2242             // Otherwise, it is possible that the channel is closed but
2243             // still has elements to retrieve.
2244             if (isClosedForReceive) !hasElements() else true
2245         }
2246         // This channel has been successfully cancelled.
2247         // Help to complete the cancellation procedure to
2248         // guarantee linearizability and return `true`.
2249         CLOSE_STATUS_CANCELLED -> {
2250             completeCancel(sendersAndCloseStatusCur.sendersCounter)
2251             true
2252         }
2253         else -> error("unexpected close status: ${sendersAndCloseStatusCur.sendersCloseStatus}")
2254     }
2255 
2256     @ExperimentalCoroutinesApi
2257     override val isEmpty: Boolean get() {
2258         // This function should return `false` if
2259         // this channel is closed for `receive`.
2260         if (isClosedForReceive) return false
2261         // Does this channel has elements to retrieve?
2262         if (hasElements()) return false
2263         // This channel does not have elements to retrieve;
2264         // Check that it is still not closed for `receive`.
2265         return !isClosedForReceive
2266     }
2267 
2268     /**
2269      * Checks whether this channel contains elements to retrieve.
2270      * Unfortunately, simply comparing the counters is insufficient,
2271      * as some cells can be in the `INTERRUPTED` state due to cancellation.
2272      * This function tries to find the first "alive" element,
2273      * updating the `receivers` counter to skip empty cells.
2274      *
2275      * The implementation is similar to `receive()`.
2276      */
2277     internal fun hasElements(): Boolean {
2278         while (true) {
2279             // Read the segment before obtaining the `receivers` counter value.
2280             var segment = receiveSegment.value
2281             // Obtains the `receivers` and `senders` counter values.
2282             val r = receiversCounter
2283             val s = sendersCounter
2284             // Is there a chance that this channel has elements?
2285             if (s <= r) return false // no elements
2286             // The `r`-th cell is covered by a sender; check whether it contains an element.
2287             // First, try to find the required segment if the initially
2288             // obtained segment (in the beginning of this function) has lower id.
2289             val id = r / SEGMENT_SIZE
2290             if (segment.id != id) {
2291                 // Try to find the required segment.
2292                 segment = findSegmentReceive(id, segment) ?:
2293                     // The required segment has not been found. Either it has already
2294                     // been removed, or the underlying linked list is already closed
2295                     // for segment additions. In the latter case, the channel is closed
2296                     // and does not contain elements, so this operation returns `false`.
2297                     // Otherwise, if the required segment is removed, the operation restarts.
2298                     if (receiveSegment.value.id < id) return false else continue
2299             }
2300             segment.cleanPrev() // all the previous segments are no longer needed.
2301             // Does the `r`-th cell contain waiting sender or buffered element?
2302             val i = (r % SEGMENT_SIZE).toInt()
2303             if (isCellNonEmpty(segment, i, r)) return true
2304             // The cell is empty. Update `receivers` counter and try again.
2305             receivers.compareAndSet(r, r + 1) // if this CAS fails, the counter has already been updated.
2306         }
2307     }
2308 
2309     /**
2310      * Checks whether this cell contains a buffered element or a waiting sender,
2311      * returning `true` in this case. Otherwise, if this cell is empty
2312      * (due to waiter cancellation, cell poisoning, or channel closing),
2313      * this function returns `false`.
2314      *
2315      * Notably, this function must be called only if the cell is covered by a sender.
2316      */
2317     private fun isCellNonEmpty(
2318         segment: ChannelSegment<E>,
2319         index: Int,
2320         globalIndex: Long
2321     ): Boolean {
2322         // The logic is similar to `updateCellReceive` with the only difference
2323         // that this function neither changes the cell state nor retrieves the element.
2324         while (true) {
2325             // Read the current cell state.
2326             val state = segment.getState(index)
2327             when {
2328                 // The cell is empty but a sender is coming.
2329                 state === null || state === IN_BUFFER -> {
2330                     // Poison the cell to ensure correctness.
2331                     if (segment.casState(index, state, POISONED)) {
2332                         // When the cell becomes poisoned, it is essentially
2333                         // the same as storing an already cancelled receiver.
2334                         // Thus, the `expandBuffer()` procedure should be invoked.
2335                         expandBuffer()
2336                         return false
2337                     }
2338                 }
2339                 // The cell stores a buffered element.
2340                 state === BUFFERED -> return true
2341                 // The cell stores an interrupted sender.
2342                 state === INTERRUPTED_SEND -> return false
2343                 // This channel is already closed.
2344                 state === CHANNEL_CLOSED -> return false
2345                 // The cell is already processed
2346                 // by a concurrent receiver.
2347                 state === DONE_RCV -> return false
2348                 // The cell is already poisoned
2349                 // by a concurrent receiver.
2350                 state === POISONED -> return false
2351                 // A concurrent `expandBuffer()` is resuming
2352                 // a suspended sender. This function is eligible
2353                 // to linearize before the buffer expansion procedure.
2354                 state === RESUMING_BY_EB -> return true
2355                 // A concurrent receiver is resuming
2356                 // a suspended sender. The element
2357                 // is no longer available for retrieval.
2358                 state === RESUMING_BY_RCV -> return false
2359                 // The cell stores a suspended request.
2360                 // However, it is possible that this request
2361                 // is receiver if the cell is covered by both
2362                 // send and receive operations.
2363                 // In case the cell is already covered by
2364                 // a receiver, the element is no longer
2365                 // available for retrieval, and this function
2366                 // return `false`. Otherwise, it is guaranteed
2367                 // that the suspended request is sender, so
2368                 // this function returns `true`.
2369                 else -> return globalIndex == receiversCounter
2370             }
2371         }
2372     }
2373 
2374     // #######################
2375     // # Segments Management #
2376     // #######################
2377 
2378     /**
2379      * Finds the segment with the specified [id] starting by the [startFrom]
2380      * segment and following the [ChannelSegment.next] references. In case
2381      * the required segment has not been created yet, this function attempts
2382      * to add it to the underlying linked list. Finally, it updates [sendSegment]
2383      * to the found segment if its [ChannelSegment.id] is greater than the one
2384      * of the already stored segment.
2385      *
2386      * In case the requested segment is already removed, or if it should be allocated
2387      * but the linked list structure is closed for new segments addition, this function
2388      * returns `null`. The implementation also efficiently skips a sequence of removed
2389      * segments, updating the counter value in [sendersAndCloseStatus] correspondingly.
2390      */
2391     private fun findSegmentSend(id: Long, startFrom: ChannelSegment<E>): ChannelSegment<E>? {
2392         return sendSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2393             if (it.isClosed) {
2394                 // The required segment has not been found and new segments
2395                 // cannot be added, as the linked listed in already added.
2396                 // This channel is already closed or cancelled; help to complete
2397                 // the closing or cancellation procedure.
2398                 completeCloseOrCancel()
2399                 // Clean the `prev` reference of the provided segment
2400                 // if all the previous cells are already covered by senders.
2401                 // It is important to clean the `prev` reference only in
2402                 // this case, as the closing/cancellation procedure may
2403                 // need correct value to traverse the linked list from right to left.
2404                 if (startFrom.id * SEGMENT_SIZE <  receiversCounter) startFrom.cleanPrev()
2405                 // As the required segment is not found and cannot be allocated, return `null`.
2406                 null
2407             } else {
2408                 // Get the found segment.
2409                 val segment = it.segment
2410                 // Is the required segment removed?
2411                 if (segment.id > id) {
2412                     // The required segment has been removed; `segment` is the first
2413                     // segment with `id` not lower than the required one.
2414                     // Skip the sequence of removed cells in O(1).
2415                     updateSendersCounterIfLower(segment.id * SEGMENT_SIZE)
2416                     // Clean the `prev` reference of the provided segment
2417                     // if all the previous cells are already covered by senders.
2418                     // It is important to clean the `prev` reference only in
2419                     // this case, as the closing/cancellation procedure may
2420                     // need correct value to traverse the linked list from right to left.
2421                     if (segment.id * SEGMENT_SIZE <  receiversCounter) segment.cleanPrev()
2422                     // As the required segment is not found and cannot be allocated, return `null`.
2423                     null
2424                 } else {
2425                     assert { segment.id == id }
2426                     // The required segment has been found; return it!
2427                     segment
2428                 }
2429             }
2430         }
2431     }
2432 
2433     /**
2434      * Finds the segment with the specified [id] starting by the [startFrom]
2435      * segment and following the [ChannelSegment.next] references. In case
2436      * the required segment has not been created yet, this function attempts
2437      * to add it to the underlying linked list. Finally, it updates [receiveSegment]
2438      * to the found segment if its [ChannelSegment.id] is greater than the one
2439      * of the already stored segment.
2440      *
2441      * In case the requested segment is already removed, or if it should be allocated
2442      * but the linked list structure is closed for new segments addition, this function
2443      * returns `null`. The implementation also efficiently skips a sequence of removed
2444      * segments, updating the [receivers] counter correspondingly.
2445      */
2446     private fun findSegmentReceive(id: Long, startFrom: ChannelSegment<E>): ChannelSegment<E>? =
2447         receiveSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2448             if (it.isClosed) {
2449                 // The required segment has not been found and new segments
2450                 // cannot be added, as the linked listed in already added.
2451                 // This channel is already closed or cancelled; help to complete
2452                 // the closing or cancellation procedure.
2453                 completeCloseOrCancel()
2454                 // Clean the `prev` reference of the provided segment
2455                 // if all the previous cells are already covered by senders.
2456                 // It is important to clean the `prev` reference only in
2457                 // this case, as the closing/cancellation procedure may
2458                 // need correct value to traverse the linked list from right to left.
2459                 if (startFrom.id * SEGMENT_SIZE < sendersCounter) startFrom.cleanPrev()
2460                 // As the required segment is not found and cannot be allocated, return `null`.
2461                 null
2462             } else {
2463                 // Get the found segment.
2464                 val segment = it.segment
2465                 // Advance the `bufferEnd` segment if required.
2466                 if (!isRendezvousOrUnlimited && id <= bufferEndCounter / SEGMENT_SIZE) {
2467                     bufferEndSegment.moveForward(segment)
2468                 }
2469                 // Is the required segment removed?
2470                 if (segment.id > id) {
2471                     // The required segment has been removed; `segment` is the first
2472                     // segment with `id` not lower than the required one.
2473                     // Skip the sequence of removed cells in O(1).
2474                     updateReceiversCounterIfLower(segment.id * SEGMENT_SIZE)
2475                     // Clean the `prev` reference of the provided segment
2476                     // if all the previous cells are already covered by senders.
2477                     // It is important to clean the `prev` reference only in
2478                     // this case, as the closing/cancellation procedure may
2479                     // need correct value to traverse the linked list from right to left.
2480                     if (segment.id * SEGMENT_SIZE < sendersCounter) segment.cleanPrev()
2481                     // As the required segment is already removed, return `null`.
2482                     null
2483                 } else {
2484                     assert { segment.id == id }
2485                     // The required segment has been found; return it!
2486                     segment
2487                 }
2488             }
2489         }
2490 
2491     /**
2492      * Importantly, when this function does not find the requested segment,
2493      * it always updates the number of completed `expandBuffer()` attempts.
2494      */
2495     private fun findSegmentBufferEnd(id: Long, startFrom: ChannelSegment<E>, currentBufferEndCounter: Long): ChannelSegment<E>? =
2496         bufferEndSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2497             if (it.isClosed) {
2498                 // The required segment has not been found and new segments
2499                 // cannot be added, as the linked listed in already added.
2500                 // This channel is already closed or cancelled; help to complete
2501                 // the closing or cancellation procedure.
2502                 completeCloseOrCancel()
2503                 // Update `bufferEndSegment` to the last segment
2504                 // in the linked list to avoid memory leaks.
2505                 moveSegmentBufferEndToSpecifiedOrLast(id, startFrom)
2506                 // When this function does not find the requested segment,
2507                 // it should update the number of completed `expandBuffer()` attempts.
2508                 incCompletedExpandBufferAttempts()
2509                 null
2510             } else {
2511                 // Get the found segment.
2512                 val segment = it.segment
2513                 // Is the required segment removed?
2514                 if (segment.id > id) {
2515                     // The required segment has been removed; `segment` is the first segment
2516                     // with `id` not lower than the required one.
2517                     // Try to skip the sequence of removed cells in O(1) by increasing the `bufferEnd` counter.
2518                     // Importantly, when this function does not find the requested segment,
2519                     // it should update the number of completed `expandBuffer()` attempts.
2520                     if (bufferEnd.compareAndSet(currentBufferEndCounter + 1, segment.id * SEGMENT_SIZE)) {
2521                         incCompletedExpandBufferAttempts(segment.id * SEGMENT_SIZE - currentBufferEndCounter)
2522                     } else {
2523                         incCompletedExpandBufferAttempts()
2524                     }
2525                     // As the required segment is already removed, return `null`.
2526                     null
2527                 } else {
2528                     assert { segment.id == id }
2529                     // The required segment has been found; return it!
2530                     segment
2531                 }
2532             }
2533         }
2534 
2535     /**
2536      * Updates [bufferEndSegment] to the one with the specified [id] or
2537      * to the last existing segment, if the required segment is not yet created.
2538      *
2539      * Unlike [findSegmentBufferEnd], this function does not allocate new segments.
2540      */
2541     private fun moveSegmentBufferEndToSpecifiedOrLast(id: Long, startFrom: ChannelSegment<E>) {
2542         // Start searching the required segment from the specified one.
2543         var segment: ChannelSegment<E> = startFrom
2544         while (segment.id < id) {
2545             segment = segment.next ?: break
2546         }
2547         // Skip all removed segments and try to update `bufferEndSegment`
2548         // to the first non-removed one. This part should succeed eventually,
2549         // as the tail segment is never removed.
2550         while (true) {
2551             while (segment.isRemoved) {
2552                 segment = segment.next ?: break
2553             }
2554             // Try to update `bufferEndSegment`. On failure,
2555             // the found segment is already removed, so it
2556             // should be skipped.
2557             if (bufferEndSegment.moveForward(segment)) return
2558         }
2559     }
2560 
2561     /**
2562      * Updates the `senders` counter if its value
2563      * is lower that the specified one.
2564      *
2565      * Senders use this function to efficiently skip
2566      * a sequence of cancelled receivers.
2567      */
2568     private fun updateSendersCounterIfLower(value: Long): Unit =
2569         sendersAndCloseStatus.loop { cur ->
2570             val curCounter = cur.sendersCounter
2571             if (curCounter >= value) return
2572             val update = constructSendersAndCloseStatus(curCounter, cur.sendersCloseStatus)
2573             if (sendersAndCloseStatus.compareAndSet(cur, update)) return
2574         }
2575 
2576     /**
2577      * Updates the `receivers` counter if its value
2578      * is lower that the specified one.
2579      *
2580      * Receivers use this function to efficiently skip
2581      * a sequence of cancelled senders.
2582      */
2583     private fun updateReceiversCounterIfLower(value: Long): Unit =
2584         receivers.loop { cur ->
2585             if (cur >= value) return
2586             if (receivers.compareAndSet(cur, value)) return
2587         }
2588 
2589     // ###################
2590     // # Debug Functions #
2591     // ###################
2592 
2593     @Suppress("ConvertTwoComparisonsToRangeCheck")
2594     override fun toString(): String {
2595         val sb = StringBuilder()
2596         // Append the close status
2597         when (sendersAndCloseStatus.value.sendersCloseStatus) {
2598             CLOSE_STATUS_CLOSED -> sb.append("closed,")
2599             CLOSE_STATUS_CANCELLED -> sb.append("cancelled,")
2600         }
2601         // Append the buffer capacity
2602         sb.append("capacity=$capacity,")
2603         // Append the data
2604         sb.append("data=[")
2605         val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2606             .filter { it !== NULL_SEGMENT }
2607             .minBy { it.id }
2608         val r = receiversCounter
2609         val s = sendersCounter
2610         var segment = firstSegment
2611         append_elements@ while (true) {
2612             process_cell@ for (i in 0 until SEGMENT_SIZE) {
2613                 val globalCellIndex = segment.id * SEGMENT_SIZE + i
2614                 if (globalCellIndex >= s && globalCellIndex >= r) break@append_elements
2615                 val cellState = segment.getState(i)
2616                 val element = segment.getElement(i)
2617                 val cellStateString = when (cellState) {
2618                     is CancellableContinuation<*> -> {
2619                         when {
2620                             globalCellIndex < r && globalCellIndex >= s -> "receive"
2621                             globalCellIndex < s && globalCellIndex >= r -> "send"
2622                             else -> "cont"
2623                         }
2624                     }
2625                     is SelectInstance<*> -> {
2626                         when {
2627                             globalCellIndex < r && globalCellIndex >= s -> "onReceive"
2628                             globalCellIndex < s && globalCellIndex >= r -> "onSend"
2629                             else -> "select"
2630                         }
2631                     }
2632                     is ReceiveCatching<*> -> "receiveCatching"
2633                     is SendBroadcast -> "sendBroadcast"
2634                     is WaiterEB -> "EB($cellState)"
2635                     RESUMING_BY_RCV, RESUMING_BY_EB -> "resuming_sender"
2636                     null, IN_BUFFER, DONE_RCV, POISONED, INTERRUPTED_RCV, INTERRUPTED_SEND, CHANNEL_CLOSED -> continue@process_cell
2637                     else -> cellState.toString() // leave it just in case something is missed.
2638                 }
2639                 if (element != null) {
2640                     sb.append("($cellStateString,$element),")
2641                 } else {
2642                     sb.append("$cellStateString,")
2643                 }
2644             }
2645             // Process the next segment if exists.
2646             segment = segment.next ?: break
2647         }
2648         if (sb.last() == ',') sb.deleteAt(sb.length - 1)
2649         sb.append("]")
2650         // The string representation is constructed.
2651         return sb.toString()
2652     }
2653 
2654     // Returns a debug representation of this channel,
2655     // which is actively used in Lincheck tests.
2656     internal fun toStringDebug(): String {
2657         val sb = StringBuilder()
2658         // Append the counter values and the close status
2659         sb.append("S=${sendersCounter},R=${receiversCounter},B=${bufferEndCounter},B'=${completedExpandBuffersAndPauseFlag.value},C=${sendersAndCloseStatus.value.sendersCloseStatus},")
2660         when (sendersAndCloseStatus.value.sendersCloseStatus) {
2661             CLOSE_STATUS_CANCELLATION_STARTED -> sb.append("CANCELLATION_STARTED,")
2662             CLOSE_STATUS_CLOSED -> sb.append("CLOSED,")
2663             CLOSE_STATUS_CANCELLED -> sb.append("CANCELLED,")
2664         }
2665         // Append the segment references
2666         sb.append("SEND_SEGM=${sendSegment.value.hexAddress},RCV_SEGM=${receiveSegment.value.hexAddress}")
2667         if (!isRendezvousOrUnlimited) sb.append(",EB_SEGM=${bufferEndSegment.value.hexAddress}")
2668         sb.append("  ") // add some space
2669         // Append the linked list of segments.
2670         val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2671             .filter { it !== NULL_SEGMENT }
2672             .minBy { it.id }
2673         var segment = firstSegment
2674         while (true) {
2675             sb.append("${segment.hexAddress}=[${if (segment.isRemoved) "*" else ""}${segment.id},prev=${segment.prev?.hexAddress},")
2676             repeat(SEGMENT_SIZE) { i ->
2677                 val cellState = segment.getState(i)
2678                 val element = segment.getElement(i)
2679                 val cellStateString = when (cellState) {
2680                     is CancellableContinuation<*> -> "cont"
2681                     is SelectInstance<*> -> "select"
2682                     is ReceiveCatching<*> -> "receiveCatching"
2683                     is SendBroadcast -> "send(broadcast)"
2684                     is WaiterEB -> "EB($cellState)"
2685                     else -> cellState.toString()
2686                 }
2687                 sb.append("[$i]=($cellStateString,$element),")
2688             }
2689             sb.append("next=${segment.next?.hexAddress}]  ")
2690             // Process the next segment if exists.
2691             segment = segment.next ?: break
2692         }
2693         // The string representation of this channel is now constructed!
2694         return sb.toString()
2695     }
2696 
2697 
2698     // This is an internal methods for tests.
2699     fun checkSegmentStructureInvariants() {
2700         if (isRendezvousOrUnlimited) {
2701             check(bufferEndSegment.value === NULL_SEGMENT) {
2702                 "bufferEndSegment must be NULL_SEGMENT for rendezvous and unlimited channels; they do not manipulate it.\n" +
2703                     "Channel state: $this"
2704             }
2705         } else {
2706             check(receiveSegment.value.id <= bufferEndSegment.value.id) {
2707                 "bufferEndSegment should not have lower id than receiveSegment.\n" +
2708                     "Channel state: $this"
2709             }
2710         }
2711         val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2712             .filter { it !== NULL_SEGMENT }
2713             .minBy { it.id }
2714         check(firstSegment.prev == null) {
2715             "All processed segments should be unreachable from the data structure, but the `prev` link of the leftmost segment is non-null.\n" +
2716                 "Channel state: $this"
2717         }
2718         // Check that the doubly-linked list of segments does not
2719         // contain full-of-cancelled-cells segments.
2720         var segment = firstSegment
2721         while (segment.next != null) {
2722             // Note that the `prev` reference can be `null` if this channel is closed.
2723             check(segment.next!!.prev == null || segment.next!!.prev === segment) {
2724                 "The `segment.next.prev === segment` invariant is violated.\n" +
2725                     "Channel state: $this"
2726             }
2727             // Count the number of closed/interrupted cells
2728             // and check that all cells are in expected states.
2729             var interruptedOrClosedCells = 0
2730             for (i in 0 until SEGMENT_SIZE) {
2731                 when (val state = segment.getState(i)) {
2732                     BUFFERED -> {} // The cell stores a buffered element.
2733                     is Waiter -> {} // The cell stores a suspended request.
2734                     INTERRUPTED_RCV, INTERRUPTED_SEND, CHANNEL_CLOSED -> {
2735                         // The cell stored an interrupted request or indicates
2736                         // that this channel is already closed.
2737                         // Check that the element slot is cleaned and increment
2738                         // the number of cells in closed/interrupted state.
2739                         check(segment.getElement(i) == null)
2740                         interruptedOrClosedCells++
2741                     }
2742                     POISONED, DONE_RCV -> {
2743                         // The cell is successfully processed or poisoned.
2744                         // Check that the element slot is cleaned.
2745                         check(segment.getElement(i) == null)
2746                     }
2747                     // Other states are illegal after all running operations finish.
2748                     else -> error("Unexpected segment cell state: $state.\nChannel state: $this")
2749                 }
2750             }
2751             // Is this segment full of cancelled/closed cells?
2752             // If so, this segment should be removed from the
2753             // linked list if nether `receiveSegment`, nor
2754             // `sendSegment`, nor `bufferEndSegment` reference it.
2755             if (interruptedOrClosedCells == SEGMENT_SIZE) {
2756                 check(segment === receiveSegment.value || segment === sendSegment.value || segment === bufferEndSegment.value) {
2757                     "Logically removed segment is reachable.\nChannel state: $this"
2758                 }
2759             }
2760             // Process the next segment.
2761             segment = segment.next!!
2762         }
2763     }
2764 }
2765 
2766 /**
2767  * The channel is represented as a list of segments, which simulates an infinite array.
2768  * Each segment has its own [id], which increase from the beginning. These [id]s help
2769  * to update [BufferedChannel.sendSegment], [BufferedChannel.receiveSegment],
2770  * and [BufferedChannel.bufferEndSegment] correctly.
2771  */
2772 internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: BufferedChannel<E>?, pointers: Int) : Segment<ChannelSegment<E>>(id, prev, pointers) {
2773     private val _channel: BufferedChannel<E>? = channel
2774     val channel get() = _channel!! // always non-null except for `NULL_SEGMENT`
2775 
2776     private val data = atomicArrayOfNulls<Any?>(SEGMENT_SIZE * 2) // 2 registers per slot: state + element
2777     override val numberOfSlots: Int get() = SEGMENT_SIZE
2778 
2779     // ########################################
2780     // # Manipulation with the Element Fields #
2781     // ########################################
2782 
storeElementnull2783     internal fun storeElement(index: Int, element: E) {
2784         setElementLazy(index, element)
2785     }
2786 
2787     @Suppress("UNCHECKED_CAST")
getElementnull2788     internal fun getElement(index: Int) = data[index * 2].value as E
2789 
2790     internal fun retrieveElement(index: Int): E = getElement(index).also { cleanElement(index) }
2791 
cleanElementnull2792     internal fun cleanElement(index: Int) {
2793         setElementLazy(index, null)
2794     }
2795 
setElementLazynull2796     private fun setElementLazy(index: Int, value: Any?) {
2797         data[index * 2].lazySet(value)
2798     }
2799 
2800     // ######################################
2801     // # Manipulation with the State Fields #
2802     // ######################################
2803 
getStatenull2804     internal fun getState(index: Int): Any? = data[index * 2 + 1].value
2805 
2806     internal fun setState(index: Int, value: Any?) {
2807         data[index * 2 + 1].value = value
2808     }
2809 
casStatenull2810     internal fun casState(index: Int, from: Any?, to: Any?) = data[index * 2 + 1].compareAndSet(from, to)
2811 
2812     internal fun getAndSetState(index: Int, update: Any?) = data[index * 2 + 1].getAndSet(update)
2813 
2814 
2815     // ########################
2816     // # Cancellation Support #
2817     // ########################
2818 
2819     override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
2820         // To distinguish cancelled senders and receivers, senders equip the index value with
2821         // an additional marker, adding `SEGMENT_SIZE` to the value.
2822         val isSender = index >= SEGMENT_SIZE
2823         // Unwrap the index.
2824         @Suppress("NAME_SHADOWING") val index = if (isSender) index - SEGMENT_SIZE else index
2825         // Read the element, which may be needed further to call `onUndeliveredElement`.
2826         val element = getElement(index)
2827         // Update the cell state.
2828         while (true) {
2829             // CAS-loop
2830             // Read the current state of the cell.
2831             val cur = getState(index)
2832             when {
2833                 // The cell stores a waiter.
2834                 cur is Waiter || cur is WaiterEB -> {
2835                     // The cancelled request is either send or receive.
2836                     // Update the cell state correspondingly.
2837                     val update = if (isSender) INTERRUPTED_SEND else INTERRUPTED_RCV
2838                     if (casState(index, cur, update)) {
2839                         // The waiter has been successfully cancelled.
2840                         // Clean the element slot and invoke `onSlotCleaned()`,
2841                         // which may cause deleting the whole segment from the linked list.
2842                         // In case the cancelled request is receiver, it is critical to ensure
2843                         // that the `expandBuffer()` attempt that processes this cell is completed,
2844                         // so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`.
2845                         cleanElement(index)
2846                         onCancelledRequest(index, !isSender)
2847                         // Call `onUndeliveredElement` if needed.
2848                         if (isSender) {
2849                             channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2850                         }
2851                         return
2852                     }
2853                 }
2854                 // The cell already indicates that the operation is cancelled.
2855                 cur === INTERRUPTED_SEND || cur === INTERRUPTED_RCV -> {
2856                     // Clean the element slot to avoid memory leaks,
2857                     // invoke `onUndeliveredElement` if needed, and finish
2858                     cleanElement(index)
2859                     // Call `onUndeliveredElement` if needed.
2860                     if (isSender) {
2861                         channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2862                     }
2863                     return
2864                 }
2865                 // An opposite operation is resuming this request;
2866                 // wait until the cell state updates.
2867                 // It is possible that an opposite operation has already
2868                 // resumed this request, which will result in updating
2869                 // the cell state to `DONE_RCV` or `BUFFERED`, while the
2870                 // current cancellation is caused by prompt cancellation.
2871                 cur === RESUMING_BY_EB || cur === RESUMING_BY_RCV -> continue
2872                 // This request was successfully resumed, so this cancellation
2873                 // is caused by the prompt cancellation feature and should be ignored.
2874                 cur === DONE_RCV || cur === BUFFERED -> return
2875                 // The cell state indicates that the channel is closed;
2876                 // this cancellation should be ignored.
2877                 cur === CHANNEL_CLOSED -> return
2878                 else -> error("unexpected state: $cur")
2879             }
2880         }
2881     }
2882 
2883     /**
2884      * Invokes `onSlotCleaned()` preceded by a `waitExpandBufferCompletion(..)` call
2885      * in case the cancelled request is receiver.
2886      */
onCancelledRequestnull2887     fun onCancelledRequest(index: Int, receiver: Boolean) {
2888         if (receiver) channel.waitExpandBufferCompletion(id * SEGMENT_SIZE + index)
2889         onSlotCleaned()
2890     }
2891 }
2892 
2893 // WA for atomicfu + JVM_IR compiler bug that lead to SMAP-related compiler crashes: KT-55983
createSegmentFunctionnull2894 internal fun <E> createSegmentFunction(): KFunction2<Long, ChannelSegment<E>, ChannelSegment<E>> = ::createSegment
2895 
2896 private fun <E> createSegment(id: Long, prev: ChannelSegment<E>) = ChannelSegment(
2897     id = id,
2898     prev = prev,
2899     channel = prev.channel,
2900     pointers = 0
2901 )
2902 private val NULL_SEGMENT = ChannelSegment<Any?>(id = -1, prev = null, channel = null, pointers = 0)
2903 
2904 /**
2905  * Number of cells in each segment.
2906  */
2907 @JvmField
2908 internal val SEGMENT_SIZE = systemProp("kotlinx.coroutines.bufferedChannel.segmentSize", 32)
2909 
2910 /**
2911  * Number of iterations to wait in [BufferedChannel.waitExpandBufferCompletion] until the numbers of started and completed
2912  * [BufferedChannel.expandBuffer] calls coincide. When the limit is reached, [BufferedChannel.waitExpandBufferCompletion]
2913  * blocks further [BufferedChannel.expandBuffer]-s to avoid starvation.
2914  */
2915 private val EXPAND_BUFFER_COMPLETION_WAIT_ITERATIONS = systemProp("kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations", 10_000)
2916 
2917 /**
2918  * Tries to resume this continuation with the specified
2919  * value. Returns `true` on success and `false` on failure.
2920  */
2921 private fun <T> CancellableContinuation<T>.tryResume0(
2922     value: T,
2923     onCancellation: ((cause: Throwable) -> Unit)? = null
2924 ): Boolean =
2925     tryResume(value, null, onCancellation).let { token ->
2926         if (token != null) {
2927             completeResume(token)
2928             true
2929         } else false
2930     }
2931 
2932 /*
2933   If the channel is rendezvous or unlimited, the `bufferEnd` counter
2934   should be initialized with the corresponding value below and never change.
2935   In this case, the `expandBuffer(..)` operation does nothing.
2936  */
2937 private const val BUFFER_END_RENDEZVOUS = 0L // no buffer
2938 private const val BUFFER_END_UNLIMITED = Long.MAX_VALUE // infinite buffer
initialBufferEndnull2939 private fun initialBufferEnd(capacity: Int): Long = when (capacity) {
2940     Channel.RENDEZVOUS -> BUFFER_END_RENDEZVOUS
2941     Channel.UNLIMITED -> BUFFER_END_UNLIMITED
2942     else -> capacity.toLong()
2943 }
2944 
2945 /*
2946   Cell states. The initial "empty" state is represented with `null`,
2947   and suspended operations are represented with [Waiter] instances.
2948  */
2949 
2950 // The cell stores a buffered element.
2951 @JvmField
2952 internal val BUFFERED = Symbol("BUFFERED")
2953 // Concurrent `expandBuffer(..)` can inform the
2954 // upcoming sender that it should buffer the element.
2955 private val IN_BUFFER = Symbol("SHOULD_BUFFER")
2956 // Indicates that a receiver (RCV suffix) is resuming
2957 // the suspended sender; after that, it should update
2958 // the state to either `DONE_RCV` (on success) or
2959 // `INTERRUPTED_SEND` (on failure).
2960 private val RESUMING_BY_RCV = Symbol("S_RESUMING_BY_RCV")
2961 // Indicates that `expandBuffer(..)` (RCV suffix) is resuming
2962 // the suspended sender; after that, it should update
2963 // the state to either `BUFFERED` (on success) or
2964 // `INTERRUPTED_SEND` (on failure).
2965 private val RESUMING_BY_EB = Symbol("RESUMING_BY_EB")
2966 // When a receiver comes to the cell already covered by
2967 // a sender (according to the counters), but the cell
2968 // is still in `EMPTY` or `IN_BUFFER` state, it breaks
2969 // the cell by changing its state to `POISONED`.
2970 private val POISONED = Symbol("POISONED")
2971 // When the element is successfully transferred
2972 // to a receiver, the cell changes to `DONE_RCV`.
2973 private val DONE_RCV = Symbol("DONE_RCV")
2974 // Cancelled sender.
2975 private val INTERRUPTED_SEND = Symbol("INTERRUPTED_SEND")
2976 // Cancelled receiver.
2977 private val INTERRUPTED_RCV = Symbol("INTERRUPTED_RCV")
2978 // Indicates that the channel is closed.
2979 internal val CHANNEL_CLOSED = Symbol("CHANNEL_CLOSED")
2980 // When the cell is already covered by both sender and
2981 // receiver (`sender` and `receivers` counters are greater
2982 // than the cell number), the `expandBuffer(..)` procedure
2983 // cannot distinguish which kind of operation is stored
2984 // in the cell. Thus, it wraps the waiter with this descriptor,
2985 // informing the possibly upcoming receiver that it should
2986 // complete the `expandBuffer(..)` procedure if the waiter stored
2987 // in the cell is sender. In turn, senders ignore this information.
2988 private class WaiterEB(@JvmField val waiter: Waiter) {
toStringnull2989     override fun toString() = "WaiterEB($waiter)"
2990 }
2991 
2992 
2993 
2994 /**
2995  * To distinguish suspended [BufferedChannel.receive] and
2996  * [BufferedChannel.receiveCatching] operations, the latter
2997  * uses this wrapper for its continuation.
2998  */
2999 private class ReceiveCatching<E>(
3000     @JvmField val cont: CancellableContinuationImpl<ChannelResult<E>>
3001 ) : Waiter by cont
3002 
3003 /*
3004   Internal results for [BufferedChannel.updateCellReceive].
3005   On successful rendezvous with waiting sender or
3006   buffered element retrieval, the corresponding element
3007   is returned as result of [BufferedChannel.updateCellReceive].
3008  */
3009 private val SUSPEND = Symbol("SUSPEND")
3010 private val SUSPEND_NO_WAITER = Symbol("SUSPEND_NO_WAITER")
3011 private val FAILED = Symbol("FAILED")
3012 
3013 /*
3014   Internal results for [BufferedChannel.updateCellSend]
3015  */
3016 private const val RESULT_RENDEZVOUS = 0
3017 private const val RESULT_BUFFERED = 1
3018 private const val RESULT_SUSPEND = 2
3019 private const val RESULT_SUSPEND_NO_WAITER = 3
3020 private const val RESULT_CLOSED = 4
3021 private const val RESULT_FAILED = 5
3022 
3023 /**
3024  * Special value for [BufferedChannel.BufferedChannelIterator.receiveResult]
3025  * that indicates the absence of pre-received result.
3026  */
3027 private val NO_RECEIVE_RESULT = Symbol("NO_RECEIVE_RESULT")
3028 
3029 /*
3030   As [BufferedChannel.invokeOnClose] can be invoked concurrently
3031   with channel closing, we have to synchronize them. These two
3032   markers help with the synchronization.
3033  */
3034 private val CLOSE_HANDLER_CLOSED = Symbol("CLOSE_HANDLER_CLOSED")
3035 private val CLOSE_HANDLER_INVOKED = Symbol("CLOSE_HANDLER_INVOKED")
3036 
3037 /**
3038  * Specifies the absence of closing cause, stored in [BufferedChannel._closeCause].
3039  * When the channel is closed or cancelled without exception, this [NO_CLOSE_CAUSE]
3040  * marker should be replaced with `null`.
3041  */
3042 private val NO_CLOSE_CAUSE = Symbol("NO_CLOSE_CAUSE")
3043 
3044 /*
3045   The channel close statuses. The transition scheme is the following:
3046     +--------+   +----------------------+   +-----------+
3047     | ACTIVE |-->| CANCELLATION_STARTED |-->| CANCELLED |
3048     +--------+   +----------------------+   +-----------+
3049         |                                         ^
3050         |             +--------+                  |
3051         +------------>| CLOSED |------------------+
3052                       +--------+
3053   We need `CANCELLATION_STARTED` to synchronize
3054   concurrent closing and cancellation.
3055  */
3056 private const val CLOSE_STATUS_ACTIVE = 0
3057 private const val CLOSE_STATUS_CANCELLATION_STARTED = 1
3058 private const val CLOSE_STATUS_CLOSED = 2
3059 private const val CLOSE_STATUS_CANCELLED = 3
3060 
3061 /*
3062   The `senders` counter and the channel close status
3063   are stored in a single 64-bit register to save the space
3064   and reduce the number of reads in sending operations.
3065   The code below encapsulates the required bit arithmetics.
3066  */
3067 private const val SENDERS_CLOSE_STATUS_SHIFT = 60
3068 private const val SENDERS_COUNTER_MASK = (1L shl SENDERS_CLOSE_STATUS_SHIFT) - 1
3069 private inline val Long.sendersCounter get() = this and SENDERS_COUNTER_MASK
3070 private inline val Long.sendersCloseStatus: Int get() = (this shr SENDERS_CLOSE_STATUS_SHIFT).toInt()
3071 private fun constructSendersAndCloseStatus(counter: Long, closeStatus: Int): Long =
3072     (closeStatus.toLong() shl SENDERS_CLOSE_STATUS_SHIFT) + counter
3073 
3074 /*
3075   The `completedExpandBuffersAndPauseFlag` 64-bit counter contains
3076   the number of completed `expandBuffer()` attempts along with a special
3077   flag that pauses progress to avoid starvation in `waitExpandBufferCompletion(..)`.
3078   The code below encapsulates the required bit arithmetics.
3079  */
3080 private const val EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT = 1L shl 62
3081 private const val EB_COMPLETED_COUNTER_MASK = EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT - 1
3082 private inline val Long.ebCompletedCounter get() = this and EB_COMPLETED_COUNTER_MASK
3083 private inline val Long.ebPauseExpandBuffers: Boolean get() = (this and EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT) != 0L
3084 private fun constructEBCompletedAndPauseFlag(counter: Long, pauseEB: Boolean): Long =
3085     (if (pauseEB) EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT else 0) + counter
3086