<lambda>null1 @file:Suppress("PrivatePropertyName")
2
3 package kotlinx.coroutines.channels
4
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.channels.ChannelResult.Companion.closed
8 import kotlinx.coroutines.channels.ChannelResult.Companion.failure
9 import kotlinx.coroutines.channels.ChannelResult.Companion.success
10 import kotlinx.coroutines.flow.internal.*
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.selects.*
13 import kotlinx.coroutines.selects.TrySelectDetailedResult.*
14 import kotlin.contracts.*
15 import kotlin.coroutines.*
16 import kotlin.js.*
17 import kotlin.jvm.*
18 import kotlin.math.*
19 import kotlin.random.*
20 import kotlin.reflect.*
21
22 /**
23 * The buffered channel implementation, which also serves as a rendezvous channel when the capacity is zero.
24 * The high-level structure bases on a conceptually infinite array for storing elements and waiting requests,
25 * separate counters of [send] and [receive] invocations that were ever performed, and an additional counter
26 * that indicates the end of the logical buffer by counting the number of array cells it ever contained.
27 * The key idea is that both [send] and [receive] start by incrementing their counters, assigning the array cell
28 * referenced by the counter. In case of rendezvous channels, the operation either suspends and stores its continuation
29 * in the cell or makes a rendezvous with the opposite request. Each cell can be processed by exactly one [send] and
30 * one [receive]. As for buffered channels, [send]-s can also add elements without suspension if the logical buffer
31 * contains the cell, while the [receive] operation updates the end of the buffer when its synchronization finishes.
32 *
33 * Please see the ["Fast and Scalable Channels in Kotlin Coroutines"](https://arxiv.org/abs/2211.04986)
34 * paper by Nikita Koval, Roman Elizarov, and Dan Alistarh for the detailed algorithm description.
35 */
36 internal open class BufferedChannel<E>(
37 /**
38 * Channel capacity; `Channel.RENDEZVOUS` for rendezvous channel
39 * and `Channel.UNLIMITED` for unlimited capacity.
40 */
41 private val capacity: Int,
42 @JvmField
43 internal val onUndeliveredElement: OnUndeliveredElement<E>? = null
44 ) : Channel<E> {
45 init {
46 require(capacity >= 0) { "Invalid channel capacity: $capacity, should be >=0" }
47 // This implementation has second `init`.
48 }
49
50 // Maintenance note: use `Buffered1ChannelLincheckTest` to check hypotheses.
51
52 /*
53 The counters indicate the total numbers of send, receive, and buffer expansion calls
54 ever performed. The counters are incremented in the beginning of the corresponding
55 operation; thus, acquiring a unique (for the operation type) cell to process.
56 The segments reference to the last working one for each operation type.
57
58 Notably, the counter for send is combined with the channel closing status
59 for synchronization simplicity and performance reasons.
60
61 The logical end of the buffer is initialized with the channel capacity.
62 If the channel is rendezvous or unlimited, the counter equals `BUFFER_END_RENDEZVOUS`
63 or `BUFFER_END_RENDEZVOUS`, respectively, and never updates. The `bufferEndSegment`
64 point to a special `NULL_SEGMENT` in this case.
65 */
66 private val sendersAndCloseStatus = atomic(0L)
67 private val receivers = atomic(0L)
68 private val bufferEnd = atomic(initialBufferEnd(capacity))
69
70 internal val sendersCounter: Long get() = sendersAndCloseStatus.value.sendersCounter
71 internal val receiversCounter: Long get() = receivers.value
72 private val bufferEndCounter: Long get() = bufferEnd.value
73
74 /*
75 Additionally to the counters above, we need an extra one that
76 tracks the number of cells processed by `expandBuffer()`.
77 When a receiver aborts, the corresponding cell might be
78 physically removed from the data structure to avoid memory
79 leaks, while it still can be unprocessed by `expandBuffer()`.
80 In this case, `expandBuffer()` cannot know whether the
81 removed cell contained sender or receiver and, therefore,
82 cannot proceed. To solve the race, we ensure that cells
83 correspond to cancelled receivers cannot be physically
84 removed until the cell is processed.
85 This additional counter enables the synchronization,
86 */
87 private val completedExpandBuffersAndPauseFlag = atomic(bufferEndCounter)
88
89 private val isRendezvousOrUnlimited
90 get() = bufferEndCounter.let { it == BUFFER_END_RENDEZVOUS || it == BUFFER_END_UNLIMITED }
91
92 private val sendSegment: AtomicRef<ChannelSegment<E>>
93 private val receiveSegment: AtomicRef<ChannelSegment<E>>
94 private val bufferEndSegment: AtomicRef<ChannelSegment<E>>
95
96 init {
97 @Suppress("LeakingThis")
98 val firstSegment = ChannelSegment(id = 0, prev = null, channel = this, pointers = 3)
99 sendSegment = atomic(firstSegment)
100 receiveSegment = atomic(firstSegment)
101 // If this channel is rendezvous or has unlimited capacity, the algorithm never
102 // invokes the buffer expansion procedure, and the corresponding segment reference
103 // points to a special `NULL_SEGMENT` one and never updates.
104 @Suppress("UNCHECKED_CAST")
105 bufferEndSegment = atomic(if (isRendezvousOrUnlimited) (NULL_SEGMENT as ChannelSegment<E>) else firstSegment)
106 }
107
108 // #########################
109 // ## The send operations ##
110 // #########################
111
112 override suspend fun send(element: E): Unit =
113 sendImpl( // <-- this is an inline function
114 element = element,
115 // Do not create a continuation until it is required;
116 // it is created later via [onNoWaiterSuspend], if needed.
117 waiter = null,
118 // Finish immediately if a rendezvous happens
119 // or the element has been buffered.
120 onRendezvousOrBuffered = {},
121 // As no waiter is provided, suspension is impossible.
122 onSuspend = { _, _ -> assert { false } },
123 // According to the `send(e)` contract, we need to call
124 // `onUndeliveredElement(..)` handler and throw an exception
125 // if the channel is already closed.
126 onClosed = { onClosedSend(element) },
127 // When `send(e)` decides to suspend, the corresponding
128 // `onNoWaiterSuspend` function that creates a continuation
129 // is called. The tail-call optimization is applied here.
130 onNoWaiterSuspend = { segm, i, elem, s -> sendOnNoWaiterSuspend(segm, i, elem, s) }
131 )
132
133 // NB: return type could've been Nothing, but it breaks TCO
134 private suspend fun onClosedSend(element: E): Unit = suspendCancellableCoroutine { continuation ->
135 onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
136 // If it crashes, add send exception as suppressed for better diagnostics
137 it.addSuppressed(sendException)
138 continuation.resumeWithStackTrace(it)
139 return@suspendCancellableCoroutine
140 }
141 continuation.resumeWithStackTrace(sendException)
142 }
143
144 private suspend fun sendOnNoWaiterSuspend(
145 /* The working cell is specified by
146 the segment and the index in it. */
147 segment: ChannelSegment<E>,
148 index: Int,
149 /** The element to be inserted. */
150 element: E,
151 /** The global index of the cell. */
152 s: Long
153 ) = suspendCancellableCoroutineReusable sc@{ cont ->
154 sendImplOnNoWaiter( // <-- this is an inline function
155 segment = segment, index = index, element = element, s = s,
156 // Store the created continuation as a waiter.
157 waiter = cont,
158 // If a rendezvous happens or the element has been buffered,
159 // resume the continuation and finish. In case of prompt
160 // cancellation, it is guaranteed that the element
161 // has been already buffered or passed to receiver.
162 onRendezvousOrBuffered = { cont.resume(Unit) },
163 // If the channel is closed, call `onUndeliveredElement(..)` and complete the
164 // continuation with the corresponding exception.
165 onClosed = { onClosedSendOnNoWaiterSuspend(element, cont) },
166 )
167 }
168
169 private fun Waiter.prepareSenderForSuspension(
170 /* The working cell is specified by
171 the segment and the index in it. */
172 segment: ChannelSegment<E>,
173 index: Int
174 ) {
175 // To distinguish cancelled senders and receivers,
176 // senders equip the index value with an additional marker,
177 // adding `SEGMENT_SIZE` to the value.
178 invokeOnCancellation(segment, index + SEGMENT_SIZE)
179 }
180
181 private fun onClosedSendOnNoWaiterSuspend(element: E, cont: CancellableContinuation<Unit>) {
182 onUndeliveredElement?.callUndeliveredElement(element, cont.context)
183 cont.resumeWithException(recoverStackTrace(sendException, cont))
184 }
185
186 override fun trySend(element: E): ChannelResult<Unit> {
187 // Do not try to send the element if the plain `send(e)` operation would suspend.
188 if (shouldSendSuspend(sendersAndCloseStatus.value)) return failure()
189 // This channel either has waiting receivers or is closed.
190 // Let's try to send the element!
191 // The logic is similar to the plain `send(e)` operation, with
192 // the only difference that we install `INTERRUPTED_SEND` in case
193 // the operation decides to suspend.
194 return sendImpl( // <-- this is an inline function
195 element = element,
196 // Store an already interrupted sender in case of suspension.
197 waiter = INTERRUPTED_SEND,
198 // Finish successfully when a rendezvous happens
199 // or the element has been buffered.
200 onRendezvousOrBuffered = { success(Unit) },
201 // On suspension, the `INTERRUPTED_SEND` token has been installed,
202 // and this `trySend(e)` must fail. According to the contract,
203 // we do not need to call the [onUndeliveredElement] handler.
204 onSuspend = { segm, _ ->
205 segm.onSlotCleaned()
206 failure()
207 },
208 // If the channel is closed, return the corresponding result.
209 onClosed = { closed(sendException) }
210 )
211 }
212
213 /**
214 * This is a special `send(e)` implementation that returns `true` if the element
215 * has been successfully sent, and `false` if the channel is closed.
216 *
217 * In case of coroutine cancellation, the element may be undelivered --
218 * the [onUndeliveredElement] feature is unsupported in this implementation.
219 *
220 */
221 internal open suspend fun sendBroadcast(element: E): Boolean = suspendCancellableCoroutine { cont ->
222 check(onUndeliveredElement == null) {
223 "the `onUndeliveredElement` feature is unsupported for `sendBroadcast(e)`"
224 }
225 sendImpl(
226 element = element,
227 waiter = SendBroadcast(cont),
228 onRendezvousOrBuffered = { cont.resume(true) },
229 onSuspend = { _, _ -> },
230 onClosed = { cont.resume(false) }
231 )
232 }
233
234 /**
235 * Specifies waiting [sendBroadcast] operation.
236 */
237 private class SendBroadcast(
238 val cont: CancellableContinuation<Boolean>
239 ) : Waiter by cont as CancellableContinuationImpl<Boolean>
240
241 /**
242 * Abstract send implementation.
243 */
244 private inline fun <R> sendImpl(
245 /* The element to be sent. */
246 element: E,
247 /* The waiter to be stored in case of suspension,
248 or `null` if the waiter is not created yet.
249 In the latter case, when the algorithm decides
250 to suspend, [onNoWaiterSuspend] is called. */
251 waiter: Any?,
252 /* This lambda is invoked when the element has been
253 buffered or a rendezvous with a receiver happens. */
254 onRendezvousOrBuffered: () -> R,
255 /* This lambda is called when the operation suspends in the
256 cell specified by the segment and the index in it. */
257 onSuspend: (segm: ChannelSegment<E>, i: Int) -> R,
258 /* This lambda is called when the channel
259 is observed in the closed state. */
260 onClosed: () -> R,
261 /* This lambda is called when the operation decides
262 to suspend, but the waiter is not provided (equals `null`).
263 It should create a waiter and delegate to `sendImplOnNoWaiter`. */
264 onNoWaiterSuspend: (
265 segm: ChannelSegment<E>,
266 i: Int,
267 element: E,
268 s: Long
269 ) -> R = { _, _, _, _ -> error("unexpected") }
270 ): R {
271 // Read the segment reference before the counter increment;
272 // it is crucial to be able to find the required segment later.
273 var segment = sendSegment.value
274 while (true) {
275 // Atomically increment the `senders` counter and obtain the
276 // value right before the increment along with the close status.
277 val sendersAndCloseStatusCur = sendersAndCloseStatus.getAndIncrement()
278 val s = sendersAndCloseStatusCur.sendersCounter
279 // Is this channel already closed? Keep the information.
280 val closed = sendersAndCloseStatusCur.isClosedForSend0
281 // Count the required segment id and the cell index in it.
282 val id = s / SEGMENT_SIZE
283 val i = (s % SEGMENT_SIZE).toInt()
284 // Try to find the required segment if the initially obtained
285 // one (in the beginning of this function) has lower id.
286 if (segment.id != id) {
287 // Find the required segment.
288 segment = findSegmentSend(id, segment) ?:
289 // The required segment has not been found.
290 // Finish immediately if this channel is closed,
291 // restarting the operation otherwise.
292 // In the latter case, the required segment was full
293 // of interrupted waiters and, therefore, removed
294 // physically to avoid memory leaks.
295 if (closed) {
296 return onClosed()
297 } else {
298 continue
299 }
300 }
301 // Update the cell according to the algorithm. Importantly, when
302 // the channel is already closed, storing a waiter is illegal, so
303 // the algorithm stores the `INTERRUPTED_SEND` token in this case.
304 when (updateCellSend(segment, i, element, s, waiter, closed)) {
305 RESULT_RENDEZVOUS -> {
306 // A rendezvous with a receiver has happened.
307 // The previous segments are no longer needed
308 // for the upcoming requests, so the algorithm
309 // resets the link to the previous segment.
310 segment.cleanPrev()
311 return onRendezvousOrBuffered()
312 }
313 RESULT_BUFFERED -> {
314 // The element has been buffered.
315 return onRendezvousOrBuffered()
316 }
317 RESULT_SUSPEND -> {
318 // The operation has decided to suspend and installed the
319 // specified waiter. If the channel was already closed,
320 // and the `INTERRUPTED_SEND` token has been installed as a waiter,
321 // this request finishes with the `onClosed()` action.
322 if (closed) {
323 segment.onSlotCleaned()
324 return onClosed()
325 }
326 (waiter as? Waiter)?.prepareSenderForSuspension(segment, i)
327 return onSuspend(segment, i)
328 }
329 RESULT_CLOSED -> {
330 // This channel is closed.
331 // In case this segment is already or going to be
332 // processed by a receiver, ensure that all the
333 // previous segments are unreachable.
334 if (s < receiversCounter) segment.cleanPrev()
335 return onClosed()
336 }
337 RESULT_FAILED -> {
338 // Either the cell stores an interrupted receiver,
339 // or it was poisoned by a concurrent receiver.
340 // In both cases, all the previous segments are already processed,
341 segment.cleanPrev()
342 continue
343 }
344 RESULT_SUSPEND_NO_WAITER -> {
345 // The operation has decided to suspend,
346 // but no waiter has been provided.
347 return onNoWaiterSuspend(segment, i, element, s)
348 }
349 }
350 }
351 }
352
353 // Note: this function is temporarily moved from ConflatedBufferedChannel to BufferedChannel class, because of this issue: KT-65554.
354 // For now, an inline function, which invokes atomic operations, may only be called within a parent class.
355 protected fun trySendDropOldest(element: E): ChannelResult<Unit> =
356 sendImpl( // <-- this is an inline function
357 element = element,
358 // Put the element into the logical buffer even
359 // if this channel is already full, the `onSuspend`
360 // callback below extract the first (oldest) element.
361 waiter = BUFFERED,
362 // Finish successfully when a rendezvous has happened
363 // or the element has been buffered.
364 onRendezvousOrBuffered = { return success(Unit) },
365 // In case the algorithm decided to suspend, the element
366 // was added to the buffer. However, as the buffer is now
367 // overflowed, the first (oldest) element has to be extracted.
368 onSuspend = { segm, i ->
369 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i)
370 return success(Unit)
371 },
372 // If the channel is closed, return the corresponding result.
373 onClosed = { return closed(sendException) }
374 )
375
376 private inline fun sendImplOnNoWaiter(
377 /* The working cell is specified by
378 the segment and the index in it. */
379 segment: ChannelSegment<E>,
380 index: Int,
381 /* The element to be sent. */
382 element: E,
383 /* The global index of the cell. */
384 s: Long,
385 /* The waiter to be stored in case of suspension. */
386 waiter: Waiter,
387 /* This lambda is invoked when the element has been
388 buffered or a rendezvous with a receiver happens.*/
389 onRendezvousOrBuffered: () -> Unit,
390 /* This lambda is called when the channel
391 is observed in the closed state. */
392 onClosed: () -> Unit,
393 ) {
394 // Update the cell again, now with the non-null waiter,
395 // restarting the operation from the beginning on failure.
396 // Check the `sendImpl(..)` function for the comments.
397 when (updateCellSend(segment, index, element, s, waiter, false)) {
398 RESULT_RENDEZVOUS -> {
399 segment.cleanPrev()
400 onRendezvousOrBuffered()
401 }
402 RESULT_BUFFERED -> {
403 onRendezvousOrBuffered()
404 }
405 RESULT_SUSPEND -> {
406 waiter.prepareSenderForSuspension(segment, index)
407 }
408 RESULT_CLOSED -> {
409 if (s < receiversCounter) segment.cleanPrev()
410 onClosed()
411 }
412 RESULT_FAILED -> {
413 segment.cleanPrev()
414 sendImpl(
415 element = element,
416 waiter = waiter,
417 onRendezvousOrBuffered = onRendezvousOrBuffered,
418 onSuspend = { _, _ -> },
419 onClosed = onClosed,
420 )
421 }
422 else -> error("unexpected")
423 }
424 }
425
426 private fun updateCellSend(
427 /* The working cell is specified by
428 the segment and the index in it. */
429 segment: ChannelSegment<E>,
430 index: Int,
431 /* The element to be sent. */
432 element: E,
433 /* The global index of the cell. */
434 s: Long,
435 /* The waiter to be stored in case of suspension. */
436 waiter: Any?,
437 closed: Boolean
438 ): Int {
439 // This is a fast-path of `updateCellSendSlow(..)`.
440 //
441 // First, the algorithm stores the element,
442 // performing the synchronization after that.
443 // This way, receivers safely retrieve the
444 // element, following the safe publication pattern.
445 segment.storeElement(index, element)
446 if (closed) return updateCellSendSlow(segment, index, element, s, waiter, closed)
447 // Read the current cell state.
448 val state = segment.getState(index)
449 when {
450 // The cell is empty.
451 state === null -> {
452 // If the element should be buffered, or a rendezvous should happen
453 // while the receiver is still coming, try to buffer the element.
454 // Otherwise, try to store the specified waiter in the cell.
455 if (bufferOrRendezvousSend(s)) {
456 // Move the cell state to `BUFFERED`.
457 if (segment.casState(index, null, BUFFERED)) {
458 // The element has been successfully buffered, finish.
459 return RESULT_BUFFERED
460 }
461 } else {
462 // This `send(e)` operation should suspend.
463 // However, in case the channel has already
464 // been observed closed, `INTERRUPTED_SEND`
465 // is installed instead.
466 if (waiter == null) {
467 // The waiter is not specified; return the corresponding result.
468 return RESULT_SUSPEND_NO_WAITER
469 } else {
470 // Try to install the waiter.
471 if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
472 }
473 }
474 }
475 // A waiting receiver is stored in the cell.
476 state is Waiter -> {
477 // As the element will be passed directly to the waiter,
478 // the algorithm cleans the element slot in the cell.
479 segment.cleanElement(index)
480 // Try to make a rendezvous with the suspended receiver.
481 return if (state.tryResumeReceiver(element)) {
482 // Rendezvous! Move the cell state to `DONE_RCV` and finish.
483 segment.setState(index, DONE_RCV)
484 onReceiveDequeued()
485 RESULT_RENDEZVOUS
486 } else {
487 // The resumption has failed. Update the cell state correspondingly
488 // and clean the element field. It is also possible for a concurrent
489 // cancellation handler to update the cell state; we can safely
490 // ignore these updates.
491 if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
492 segment.onCancelledRequest(index, true)
493 }
494 RESULT_FAILED
495 }
496 }
497 }
498 return updateCellSendSlow(segment, index, element, s, waiter, closed)
499 }
500
501 /**
502 * Updates the working cell of an abstract send operation.
503 */
504 private fun updateCellSendSlow(
505 /* The working cell is specified by
506 the segment and the index in it. */
507 segment: ChannelSegment<E>,
508 index: Int,
509 /* The element to be sent. */
510 element: E,
511 /* The global index of the cell. */
512 s: Long,
513 /* The waiter to be stored in case of suspension. */
514 waiter: Any?,
515 closed: Boolean
516 ): Int {
517 // Then, the cell state should be updated according to
518 // its state machine; see the paper mentioned in the very
519 // beginning for the cell life-cycle and the algorithm details.
520 while (true) {
521 // Read the current cell state.
522 val state = segment.getState(index)
523 when {
524 // The cell is empty.
525 state === null -> {
526 // If the element should be buffered, or a rendezvous should happen
527 // while the receiver is still coming, try to buffer the element.
528 // Otherwise, try to store the specified waiter in the cell.
529 if (bufferOrRendezvousSend(s) && !closed) {
530 // Move the cell state to `BUFFERED`.
531 if (segment.casState(index, null, BUFFERED)) {
532 // The element has been successfully buffered, finish.
533 return RESULT_BUFFERED
534 }
535 } else {
536 // This `send(e)` operation should suspend.
537 // However, in case the channel has already
538 // been observed closed, `INTERRUPTED_SEND`
539 // is installed instead.
540 when {
541 // The channel is closed
542 closed -> if (segment.casState(index, null, INTERRUPTED_SEND)) {
543 segment.onCancelledRequest(index, false)
544 return RESULT_CLOSED
545 }
546 // The waiter is not specified; return the corresponding result.
547 waiter == null -> return RESULT_SUSPEND_NO_WAITER
548 // Try to install the waiter.
549 else -> if (segment.casState(index, null, waiter)) return RESULT_SUSPEND
550 }
551 }
552 }
553 // This cell is in the logical buffer.
554 state === IN_BUFFER -> {
555 // Try to buffer the element.
556 if (segment.casState(index, state, BUFFERED)) {
557 // The element has been successfully buffered, finish.
558 return RESULT_BUFFERED
559 }
560 }
561 // The cell stores a cancelled receiver.
562 state === INTERRUPTED_RCV -> {
563 // Clean the element slot to avoid memory leaks and finish.
564 segment.cleanElement(index)
565 return RESULT_FAILED
566 }
567 // The cell is poisoned by a concurrent receive.
568 state === POISONED -> {
569 // Clean the element slot to avoid memory leaks and finish.
570 segment.cleanElement(index)
571 return RESULT_FAILED
572 }
573 // The channel is already closed.
574 state === CHANNEL_CLOSED -> {
575 // Clean the element slot to avoid memory leaks,
576 // ensure that the closing/cancellation procedure
577 // has been completed, and finish.
578 segment.cleanElement(index)
579 completeCloseOrCancel()
580 return RESULT_CLOSED
581 }
582 // A waiting receiver is stored in the cell.
583 else -> {
584 assert { state is Waiter || state is WaiterEB }
585 // As the element will be passed directly to the waiter,
586 // the algorithm cleans the element slot in the cell.
587 segment.cleanElement(index)
588 // Unwrap the waiting receiver from `WaiterEB` if needed.
589 // As a receiver is stored in the cell, the buffer expansion
590 // procedure would finish, so senders simply ignore the "EB" marker.
591 val receiver = if (state is WaiterEB) state.waiter else state
592 // Try to make a rendezvous with the suspended receiver.
593 return if (receiver.tryResumeReceiver(element)) {
594 // Rendezvous! Move the cell state to `DONE_RCV` and finish.
595 segment.setState(index, DONE_RCV)
596 onReceiveDequeued()
597 RESULT_RENDEZVOUS
598 } else {
599 // The resumption has failed. Update the cell state correspondingly
600 // and clean the element field. It is also possible for a concurrent
601 // `expandBuffer()` or the cancellation handler to update the cell state;
602 // we can safely ignore these updates as senders do not help `expandBuffer()`.
603 if (segment.getAndSetState(index, INTERRUPTED_RCV) !== INTERRUPTED_RCV) {
604 segment.onCancelledRequest(index, true)
605 }
606 RESULT_FAILED
607 }
608 }
609 }
610 }
611 }
612
613 /**
614 * Checks whether a [send] invocation is bound to suspend if it is called
615 * with the specified [sendersAndCloseStatus], [receivers], and [bufferEnd]
616 * values. When this channel is already closed, the function returns `false`.
617 *
618 * Specifically, [send] suspends if the channel is not unlimited,
619 * the number of receivers is greater than then index of the working cell of the
620 * potential [send] invocation, and the buffer does not cover this cell
621 * in case of buffered channel.
622 * When the channel is already closed, [send] does not suspend.
623 */
624 @JsName("shouldSendSuspend0")
625 private fun shouldSendSuspend(curSendersAndCloseStatus: Long): Boolean {
626 // Does not suspend if the channel is already closed.
627 if (curSendersAndCloseStatus.isClosedForSend0) return false
628 // Does not suspend if a rendezvous may happen or the buffer is not full.
629 return !bufferOrRendezvousSend(curSendersAndCloseStatus.sendersCounter)
630 }
631
632 /**
633 * Returns `true` when the specified [send] should place
634 * its element to the working cell without suspension.
635 */
636 private fun bufferOrRendezvousSend(curSenders: Long): Boolean =
637 curSenders < bufferEndCounter || curSenders < receiversCounter + capacity
638
639 /**
640 * Checks whether a [send] invocation is bound to suspend if it is called
641 * with the current counter and close status values. See [shouldSendSuspend] for details.
642 *
643 * Note that this implementation is _false positive_ in case of rendezvous channels,
644 * so it can return `false` when a [send] invocation is bound to suspend. Specifically,
645 * the counter of `receive()` operations may indicate that there is a waiting receiver,
646 * while it has already been cancelled, so the potential rendezvous is bound to fail.
647 */
648 internal open fun shouldSendSuspend(): Boolean = shouldSendSuspend(sendersAndCloseStatus.value)
649
650 /**
651 * Tries to resume this receiver with the specified [element] as a result.
652 * Returns `true` on success and `false` otherwise.
653 */
654 @Suppress("UNCHECKED_CAST")
655 private fun Any.tryResumeReceiver(element: E): Boolean = when(this) {
656 is SelectInstance<*> -> { // `onReceiveXXX` select clause
657 trySelect(this@BufferedChannel, element)
658 }
659 is ReceiveCatching<*> -> {
660 this as ReceiveCatching<E>
661 cont.tryResume0(success(element), onUndeliveredElement?.bindCancellationFun(element, cont.context))
662 }
663 is BufferedChannel<*>.BufferedChannelIterator -> {
664 this as BufferedChannel<E>.BufferedChannelIterator
665 tryResumeHasNext(element)
666 }
667 is CancellableContinuation<*> -> { // `receive()`
668 this as CancellableContinuation<E>
669 tryResume0(element, onUndeliveredElement?.bindCancellationFun(element, context))
670 }
671 else -> error("Unexpected receiver type: $this")
672 }
673
674 // ##########################
675 // # The receive operations #
676 // ##########################
677
678 /**
679 * This function is invoked when a receiver is added as a waiter in this channel.
680 */
681 protected open fun onReceiveEnqueued() {}
682
683 /**
684 * This function is invoked when a waiting receiver is no longer stored in this channel;
685 * independently on whether it is caused by rendezvous, cancellation, or channel closing.
686 */
687 protected open fun onReceiveDequeued() {}
688
689 override suspend fun receive(): E =
690 receiveImpl( // <-- this is an inline function
691 // Do not create a continuation until it is required;
692 // it is created later via [onNoWaiterSuspend], if needed.
693 waiter = null,
694 // Return the received element on successful retrieval from
695 // the buffer or rendezvous with a suspended sender.
696 // Also, inform `BufferedChannel` extensions that
697 // synchronization of this receive operation is completed.
698 onElementRetrieved = { element ->
699 return element
700 },
701 // As no waiter is provided, suspension is impossible.
702 onSuspend = { _, _, _ -> error("unexpected") },
703 // Throw an exception if the channel is already closed.
704 onClosed = { throw recoverStackTrace(receiveException) },
705 // If `receive()` decides to suspend, the corresponding
706 // `suspend` function that creates a continuation is called.
707 // The tail-call optimization is applied here.
708 onNoWaiterSuspend = { segm, i, r -> receiveOnNoWaiterSuspend(segm, i, r) }
709 )
710
711 private suspend fun receiveOnNoWaiterSuspend(
712 /* The working cell is specified by
713 the segment and the index in it. */
714 segment: ChannelSegment<E>,
715 index: Int,
716 /* The global index of the cell. */
717 r: Long
718 ) = suspendCancellableCoroutineReusable { cont ->
719 receiveImplOnNoWaiter( // <-- this is an inline function
720 segment = segment, index = index, r = r,
721 // Store the created continuation as a waiter.
722 waiter = cont,
723 // In case of successful element retrieval, resume
724 // the continuation with the element and inform the
725 // `BufferedChannel` extensions that the synchronization
726 // is completed. Importantly, the receiver coroutine
727 // may be cancelled after it is successfully resumed but
728 // not dispatched yet. In case `onUndeliveredElement` is
729 // specified, we need to invoke it in the latter case.
730 onElementRetrieved = { element ->
731 val onCancellation = onUndeliveredElement?.bindCancellationFun(element, cont.context)
732 cont.resume(element, onCancellation)
733 },
734 onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
735 )
736 }
737
738 private fun Waiter.prepareReceiverForSuspension(segment: ChannelSegment<E>, index: Int) {
739 onReceiveEnqueued()
740 invokeOnCancellation(segment, index)
741 }
742
743 private fun onClosedReceiveOnNoWaiterSuspend(cont: CancellableContinuation<E>) {
744 cont.resumeWithException(receiveException)
745 }
746
747 /*
748 The implementation is exactly the same as of `receive()`,
749 with the only difference that this function returns a `ChannelResult`
750 instance and does not throw exception explicitly in case the channel
751 is already closed for receiving. Please refer the plain `receive()`
752 implementation for the comments.
753 */
754 override suspend fun receiveCatching(): ChannelResult<E> =
755 receiveImpl( // <-- this is an inline function
756 waiter = null,
757 onElementRetrieved = { element ->
758 success(element)
759 },
760 onSuspend = { _, _, _ -> error("unexpected") },
761 onClosed = { closed(closeCause) },
762 onNoWaiterSuspend = { segm, i, r -> receiveCatchingOnNoWaiterSuspend(segm, i, r) }
763 )
764
765 private suspend fun receiveCatchingOnNoWaiterSuspend(
766 segment: ChannelSegment<E>,
767 index: Int,
768 r: Long
769 ) = suspendCancellableCoroutineReusable { cont ->
770 val waiter = ReceiveCatching(cont as CancellableContinuationImpl<ChannelResult<E>>)
771 receiveImplOnNoWaiter(
772 segment, index, r,
773 waiter = waiter,
774 onElementRetrieved = { element ->
775 cont.resume(success(element), onUndeliveredElement?.bindCancellationFun(element, cont.context))
776 },
777 onClosed = { onClosedReceiveCatchingOnNoWaiterSuspend(cont) }
778 )
779 }
780
781 private fun onClosedReceiveCatchingOnNoWaiterSuspend(cont: CancellableContinuation<ChannelResult<E>>) {
782 cont.resume(closed(closeCause))
783 }
784
785 override fun tryReceive(): ChannelResult<E> {
786 // Read the `receivers` counter first.
787 val r = receivers.value
788 val sendersAndCloseStatusCur = sendersAndCloseStatus.value
789 // Is this channel closed for receive?
790 if (sendersAndCloseStatusCur.isClosedForReceive0) {
791 return closed(closeCause)
792 }
793 // Do not try to receive an element if the plain `receive()` operation would suspend.
794 val s = sendersAndCloseStatusCur.sendersCounter
795 if (r >= s) return failure()
796 // Let's try to retrieve an element!
797 // The logic is similar to the plain `receive()` operation, with
798 // the only difference that we store `INTERRUPTED_RCV` in case
799 // the operation decides to suspend. This way, we can leverage
800 // the unconditional `Fetch-and-Add` instruction.
801 // One may consider storing `INTERRUPTED_RCV` instead of an actual waiter
802 // on suspension (a.k.a. "no elements to retrieve") as a short-cut of
803 // "suspending and cancelling immediately".
804 return receiveImpl( // <-- this is an inline function
805 // Store an already interrupted receiver in case of suspension.
806 waiter = INTERRUPTED_RCV,
807 // Finish when an element is successfully retrieved.
808 onElementRetrieved = { element -> success(element) },
809 // On suspension, the `INTERRUPTED_RCV` token has been
810 // installed, and this `tryReceive()` must fail.
811 onSuspend = { segm, _, globalIndex ->
812 // Emulate "cancelled" receive, thus invoking 'waitExpandBufferCompletion' manually,
813 // because effectively there were no cancellation
814 waitExpandBufferCompletion(globalIndex)
815 segm.onSlotCleaned()
816 failure()
817 },
818 // If the channel is closed, return the corresponding result.
819 onClosed = { closed(closeCause) }
820 )
821 }
822
823 /**
824 * Extracts the first element from this channel until the cell with the specified
825 * index is moved to the logical buffer. This is a key procedure for the _conflated_
826 * channel implementation, see [ConflatedBufferedChannel] with the [BufferOverflow.DROP_OLDEST]
827 * strategy on buffer overflowing.
828 */
829 protected fun dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(globalCellIndex: Long) {
830 assert { isConflatedDropOldest }
831 // Read the segment reference before the counter increment;
832 // it is crucial to be able to find the required segment later.
833 var segment = receiveSegment.value
834 while (true) {
835 // Read the receivers counter to check whether the specified cell is already in the buffer
836 // or should be moved to the buffer in a short time, due to the already started `receive()`.
837 val r = this.receivers.value
838 if (globalCellIndex < max(r + capacity, bufferEndCounter)) return
839 // The cell is outside the buffer. Try to extract the first element
840 // if the `receivers` counter has not been changed.
841 if (!this.receivers.compareAndSet(r, r + 1)) continue
842 // Count the required segment id and the cell index in it.
843 val id = r / SEGMENT_SIZE
844 val i = (r % SEGMENT_SIZE).toInt()
845 // Try to find the required segment if the initially obtained
846 // segment (in the beginning of this function) has lower id.
847 if (segment.id != id) {
848 // Find the required segment, restarting the operation if it has not been found.
849 segment = findSegmentReceive(id, segment) ?:
850 // The required segment has not been found. It is possible that the channel is already
851 // closed for receiving, so the linked list of segments is closed as well.
852 // In the latter case, the operation will finish eventually after incrementing
853 // the `receivers` counter sufficient times. Note that it is impossible to check
854 // whether this channel is closed for receiving (we do this in `receive`),
855 // as it may call this function when helping to complete closing the channel.
856 continue
857 }
858 // Update the cell according to the cell life-cycle.
859 val updCellResult = updateCellReceive(segment, i, r, null)
860 when {
861 updCellResult === FAILED -> {
862 // The cell is poisoned; restart from the beginning.
863 // To avoid memory leaks, we also need to reset
864 // the `prev` pointer of the working segment.
865 if (r < sendersCounter) segment.cleanPrev()
866 }
867 else -> { // element
868 // A buffered element was retrieved from the cell.
869 // Clean the reference to the previous segment.
870 segment.cleanPrev()
871 @Suppress("UNCHECKED_CAST")
872 onUndeliveredElement?.callUndeliveredElementCatchingException(updCellResult as E)?.let { throw it }
873 }
874 }
875 }
876 }
877
878 /**
879 * Abstract receive implementation.
880 */
881 private inline fun <R> receiveImpl(
882 /* The waiter to be stored in case of suspension,
883 or `null` if the waiter is not created yet.
884 In the latter case, if the algorithm decides
885 to suspend, [onNoWaiterSuspend] is called. */
886 waiter: Any?,
887 /* This lambda is invoked when an element has been
888 successfully retrieved, either from the buffer or
889 by making a rendezvous with a suspended sender. */
890 onElementRetrieved: (element: E) -> R,
891 /* This lambda is called when the operation suspends in the cell
892 specified by the segment and its global and in-segment indices. */
893 onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
894 /* This lambda is called when the channel is observed
895 in the closed state and no waiting sender is found,
896 which means that it is closed for receiving. */
897 onClosed: () -> R,
898 /* This lambda is called when the operation decides
899 to suspend, but the waiter is not provided (equals `null`).
900 It should create a waiter and delegate to `sendImplOnNoWaiter`. */
901 onNoWaiterSuspend: (
902 segm: ChannelSegment<E>,
903 i: Int,
904 r: Long
905 ) -> R = { _, _, _ -> error("unexpected") }
906 ): R {
907 // Read the segment reference before the counter increment;
908 // it is crucial to be able to find the required segment later.
909 var segment = receiveSegment.value
910 while (true) {
911 // Similar to the `send(e)` operation, `receive()` first checks
912 // whether the channel is already closed for receiving.
913 if (isClosedForReceive) return onClosed()
914 // Atomically increments the `receivers` counter
915 // and obtain the value right before the increment.
916 val r = this.receivers.getAndIncrement()
917 // Count the required segment id and the cell index in it.
918 val id = r / SEGMENT_SIZE
919 val i = (r % SEGMENT_SIZE).toInt()
920 // Try to find the required segment if the initially obtained
921 // segment (in the beginning of this function) has lower id.
922 if (segment.id != id) {
923 // Find the required segment, restarting the operation if it has not been found.
924 segment = findSegmentReceive(id, segment) ?:
925 // The required segment is not found. It is possible that the channel is already
926 // closed for receiving, so the linked list of segments is closed as well.
927 // In the latter case, the operation fails with the corresponding check at the beginning.
928 continue
929 }
930 // Update the cell according to the cell life-cycle.
931 val updCellResult = updateCellReceive(segment, i, r, waiter)
932 return when {
933 updCellResult === SUSPEND -> {
934 // The operation has decided to suspend and
935 // stored the specified waiter in the cell.
936 (waiter as? Waiter)?.prepareReceiverForSuspension(segment, i)
937 onSuspend(segment, i, r)
938 }
939 updCellResult === FAILED -> {
940 // The operation has tried to make a rendezvous
941 // but failed: either the opposite request has
942 // already been cancelled or the cell is poisoned.
943 // Restart from the beginning in this case.
944 // To avoid memory leaks, we also need to reset
945 // the `prev` pointer of the working segment.
946 if (r < sendersCounter) segment.cleanPrev()
947 continue
948 }
949 updCellResult === SUSPEND_NO_WAITER -> {
950 // The operation has decided to suspend,
951 // but no waiter has been provided.
952 onNoWaiterSuspend(segment, i, r)
953 }
954 else -> { // element
955 // Either a buffered element was retrieved from the cell
956 // or a rendezvous with a waiting sender has happened.
957 // Clean the reference to the previous segment before finishing.
958 segment.cleanPrev()
959 @Suppress("UNCHECKED_CAST")
960 onElementRetrieved(updCellResult as E)
961 }
962 }
963 }
964 }
965
966 private inline fun receiveImplOnNoWaiter(
967 /* The working cell is specified by
968 the segment and the index in it. */
969 segment: ChannelSegment<E>,
970 index: Int,
971 /* The global index of the cell. */
972 r: Long,
973 /* The waiter to be stored in case of suspension. */
974 waiter: Waiter,
975 /* This lambda is invoked when an element has been
976 successfully retrieved, either from the buffer or
977 by making a rendezvous with a suspended sender. */
978 onElementRetrieved: (element: E) -> Unit,
979 /* This lambda is called when the channel is observed
980 in the closed state and no waiting senders is found,
981 which means that it is closed for receiving. */
982 onClosed: () -> Unit
983 ) {
984 // Update the cell with the non-null waiter,
985 // restarting from the beginning on failure.
986 // Check the `receiveImpl(..)` function for the comments.
987 val updCellResult = updateCellReceive(segment, index, r, waiter)
988 when {
989 updCellResult === SUSPEND -> {
990 waiter.prepareReceiverForSuspension(segment, index)
991 }
992 updCellResult === FAILED -> {
993 if (r < sendersCounter) segment.cleanPrev()
994 receiveImpl(
995 waiter = waiter,
996 onElementRetrieved = onElementRetrieved,
997 onSuspend = { _, _, _ -> },
998 onClosed = onClosed
999 )
1000 }
1001 else -> {
1002 segment.cleanPrev()
1003 @Suppress("UNCHECKED_CAST")
1004 onElementRetrieved(updCellResult as E)
1005 }
1006 }
1007 }
1008
1009 private fun updateCellReceive(
1010 /* The working cell is specified by
1011 the segment and the index in it. */
1012 segment: ChannelSegment<E>,
1013 index: Int,
1014 /* The global index of the cell. */
1015 r: Long,
1016 /* The waiter to be stored in case of suspension. */
1017 waiter: Any?,
1018 ): Any? {
1019 // This is a fast-path of `updateCellReceiveSlow(..)`.
1020 //
1021 // Read the current cell state.
1022 val state = segment.getState(index)
1023 when {
1024 // The cell is empty.
1025 state === null -> {
1026 // If a rendezvous must happen, the operation does not wait
1027 // until the cell stores a buffered element or a suspended
1028 // sender, poisoning the cell and restarting instead.
1029 // Otherwise, try to store the specified waiter in the cell.
1030 val senders = sendersAndCloseStatus.value.sendersCounter
1031 if (r >= senders) {
1032 // This `receive()` operation should suspend.
1033 if (waiter === null) {
1034 // The waiter is not specified;
1035 // return the corresponding result.
1036 return SUSPEND_NO_WAITER
1037 }
1038 // Try to install the waiter.
1039 if (segment.casState(index, state, waiter)) {
1040 // The waiter has been successfully installed.
1041 // Invoke the `expandBuffer()` procedure and finish.
1042 expandBuffer()
1043 return SUSPEND
1044 }
1045 }
1046 }
1047 // The cell stores a buffered element.
1048 state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
1049 // Retrieve the element and expand the buffer.
1050 expandBuffer()
1051 return segment.retrieveElement(index)
1052 }
1053 }
1054 return updateCellReceiveSlow(segment, index, r, waiter)
1055 }
1056
1057 private fun updateCellReceiveSlow(
1058 /* The working cell is specified by
1059 the segment and the index in it. */
1060 segment: ChannelSegment<E>,
1061 index: Int,
1062 /* The global index of the cell. */
1063 r: Long,
1064 /* The waiter to be stored in case of suspension. */
1065 waiter: Any?,
1066 ): Any? {
1067 // The cell state should be updated according to its state machine;
1068 // see the paper mentioned in the very beginning for the algorithm details.
1069 while (true) {
1070 // Read the current cell state.
1071 val state = segment.getState(index)
1072 when {
1073 // The cell is empty.
1074 state === null || state === IN_BUFFER -> {
1075 // If a rendezvous must happen, the operation does not wait
1076 // until the cell stores a buffered element or a suspended
1077 // sender, poisoning the cell and restarting instead.
1078 // Otherwise, try to store the specified waiter in the cell.
1079 val senders = sendersAndCloseStatus.value.sendersCounter
1080 if (r < senders) {
1081 // The cell is already covered by sender,
1082 // so a rendezvous must happen. Unfortunately,
1083 // the cell is empty, so the operation poisons it.
1084 if (segment.casState(index, state, POISONED)) {
1085 // When the cell becomes poisoned, it is essentially
1086 // the same as storing an already cancelled receiver.
1087 // Thus, the `expandBuffer()` procedure should be invoked.
1088 expandBuffer()
1089 return FAILED
1090 }
1091 } else {
1092 // This `receive()` operation should suspend.
1093 if (waiter === null) {
1094 // The waiter is not specified;
1095 // return the corresponding result.
1096 return SUSPEND_NO_WAITER
1097 }
1098 // Try to install the waiter.
1099 if (segment.casState(index, state, waiter)) {
1100 // The waiter has been successfully installed.
1101 // Invoke the `expandBuffer()` procedure and finish.
1102 expandBuffer()
1103 return SUSPEND
1104 }
1105 }
1106 }
1107 // The cell stores a buffered element.
1108 state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
1109 // Retrieve the element and expand the buffer.
1110 expandBuffer()
1111 return segment.retrieveElement(index)
1112 }
1113 // The cell stores an interrupted sender.
1114 state === INTERRUPTED_SEND -> return FAILED
1115 // The cell is already poisoned by a concurrent
1116 // `hasElements` call. Restart in this case.
1117 state === POISONED -> return FAILED
1118 // This channel is already closed.
1119 state === CHANNEL_CLOSED -> {
1120 // Although the channel is closed, it is still required
1121 // to call the `expandBuffer()` procedure to keep
1122 // `waitForExpandBufferCompletion()` correct.
1123 expandBuffer()
1124 return FAILED
1125 }
1126 // A concurrent `expandBuffer()` is resuming a
1127 // suspended sender. Wait in a spin-loop until
1128 // the resumption attempt completes: the cell
1129 // state must change to either `BUFFERED` or
1130 // `INTERRUPTED_SEND`.
1131 state === RESUMING_BY_EB -> continue
1132 // The cell stores a suspended sender; try to resume it.
1133 else -> {
1134 // To synchronize with expandBuffer(), the algorithm
1135 // first moves the cell to an intermediate `S_RESUMING_BY_RCV`
1136 // state, updating it to either `BUFFERED` (on success) or
1137 // `INTERRUPTED_SEND` (on failure).
1138 if (segment.casState(index, state, RESUMING_BY_RCV)) {
1139 // Has a concurrent `expandBuffer()` delegated its completion?
1140 val helpExpandBuffer = state is WaiterEB
1141 // Extract the sender if needed and try to resume it.
1142 val sender = if (state is WaiterEB) state.waiter else state
1143 return if (sender.tryResumeSender(segment, index)) {
1144 // The sender has been resumed successfully!
1145 // Update the cell state correspondingly,
1146 // expand the buffer, and return the element
1147 // stored in the cell.
1148 // In case a concurrent `expandBuffer()` has delegated
1149 // its completion, the procedure should finish, as the
1150 // sender is resumed. Thus, no further action is required.
1151 segment.setState(index, DONE_RCV)
1152 expandBuffer()
1153 segment.retrieveElement(index)
1154 } else {
1155 // The resumption has failed. Update the cell correspondingly.
1156 // In case a concurrent `expandBuffer()` has delegated
1157 // its completion, the procedure should skip this cell, so
1158 // `expandBuffer()` should be called once again.
1159 segment.setState(index, INTERRUPTED_SEND)
1160 segment.onCancelledRequest(index, false)
1161 if (helpExpandBuffer) expandBuffer()
1162 FAILED
1163 }
1164 }
1165 }
1166 }
1167 }
1168 }
1169
1170 private fun Any.tryResumeSender(segment: ChannelSegment<E>, index: Int): Boolean = when (this) {
1171 is CancellableContinuation<*> -> { // suspended `send(e)` operation
1172 @Suppress("UNCHECKED_CAST")
1173 this as CancellableContinuation<Unit>
1174 tryResume0(Unit)
1175 }
1176 is SelectInstance<*> -> {
1177 this as SelectImplementation<*>
1178 val trySelectResult = trySelectDetailed(clauseObject = this@BufferedChannel, result = Unit)
1179 // Clean the element slot to avoid memory leaks
1180 // if this `select` clause should be re-registered.
1181 if (trySelectResult === REREGISTER) segment.cleanElement(index)
1182 // Was the resumption successful?
1183 trySelectResult === SUCCESSFUL
1184 }
1185 is SendBroadcast -> cont.tryResume0(true) // // suspended `sendBroadcast(e)` operation
1186 else -> error("Unexpected waiter: $this")
1187 }
1188
1189 // ################################
1190 // # The expandBuffer() procedure #
1191 // ################################
1192
1193 private fun expandBuffer() {
1194 // Do not need to take any action if
1195 // this channel is rendezvous or unlimited.
1196 if (isRendezvousOrUnlimited) return
1197 // Read the current segment of
1198 // the `expandBuffer()` procedure.
1199 var segment = bufferEndSegment.value
1200 // Try to expand the buffer until succeed.
1201 try_again@ while (true) {
1202 // Increment the logical end of the buffer.
1203 // The `b`-th cell is going to be added to the buffer.
1204 val b = bufferEnd.getAndIncrement()
1205 val id = b / SEGMENT_SIZE
1206 // After that, read the current `senders` counter.
1207 // In case its value is lower than `b`, the `send(e)`
1208 // invocation that will work with this `b`-th cell
1209 // will detect that the cell is already a part of the
1210 // buffer when comparing with the `bufferEnd` counter.
1211 // However, `bufferEndSegment` may reference an outdated
1212 // segment, which should be updated to avoid memory leaks.
1213 val s = sendersCounter
1214 if (s <= b) {
1215 // Should `bufferEndSegment` be moved forward to avoid memory leaks?
1216 if (segment.id < id && segment.next != null)
1217 moveSegmentBufferEndToSpecifiedOrLast(id, segment)
1218 // Increment the number of completed `expandBuffer()`-s and finish.
1219 incCompletedExpandBufferAttempts()
1220 return
1221 }
1222 // Is `bufferEndSegment` outdated or is the segment with the required id already removed?
1223 // Find the required segment, creating new ones if needed.
1224 if (segment.id != id) {
1225 segment = findSegmentBufferEnd(id, segment, b)
1226 // Restart if the required segment is removed, or
1227 // the linked list of segments is already closed,
1228 // and the required one will never be created.
1229 // Please note that `findSegmentBuffer(..)` updates
1230 // the number of completed `expandBuffer()` attempt
1231 // in this case.
1232 ?: continue@try_again
1233 }
1234 // Try to add the cell to the logical buffer,
1235 // updating the cell state according to the state-machine.
1236 val i = (b % SEGMENT_SIZE).toInt()
1237 if (updateCellExpandBuffer(segment, i, b)) {
1238 // The cell has been added to the logical buffer!
1239 // Increment the number of completed `expandBuffer()`-s and finish.
1240 //
1241 // Note that it is possible to increment the number of
1242 // completed `expandBuffer()` attempts earlier, right
1243 // after the segment is obtained. We find this change
1244 // counter-intuitive and prefer to avoid it.
1245 incCompletedExpandBufferAttempts()
1246 return
1247 } else {
1248 // The cell has not been added to the buffer.
1249 // Increment the number of completed `expandBuffer()`
1250 // attempts and restart.
1251 incCompletedExpandBufferAttempts()
1252 continue@try_again
1253 }
1254 }
1255 }
1256
1257 private fun updateCellExpandBuffer(
1258 /* The working cell is specified by
1259 the segment and the index in it. */
1260 segment: ChannelSegment<E>,
1261 index: Int,
1262 /* The global index of the cell. */
1263 b: Long
1264 ): Boolean {
1265 // This is a fast-path of `updateCellExpandBufferSlow(..)`.
1266 //
1267 // Read the current cell state.
1268 val state = segment.getState(index)
1269 if (state is Waiter) {
1270 // Usually, a sender is stored in the cell.
1271 // However, it is possible for a concurrent
1272 // receiver to be already suspended there.
1273 // Try to distinguish whether the waiter is a
1274 // sender by comparing the global cell index with
1275 // the `receivers` counter. In case the cell is not
1276 // covered by a receiver, a sender is stored in the cell.
1277 if (b >= receivers.value) {
1278 // The cell stores a suspended sender. Try to resume it.
1279 // To synchronize with a concurrent `receive()`, the algorithm
1280 // first moves the cell state to an intermediate `RESUMING_BY_EB`
1281 // state, updating it to either `BUFFERED` (on successful resumption)
1282 // or `INTERRUPTED_SEND` (on failure).
1283 if (segment.casState(index, state, RESUMING_BY_EB)) {
1284 return if (state.tryResumeSender(segment, index)) {
1285 // The sender has been resumed successfully!
1286 // Move the cell to the logical buffer and finish.
1287 segment.setState(index, BUFFERED)
1288 true
1289 } else {
1290 // The resumption has failed.
1291 segment.setState(index, INTERRUPTED_SEND)
1292 segment.onCancelledRequest(index, false)
1293 false
1294 }
1295 }
1296 }
1297 }
1298 return updateCellExpandBufferSlow(segment, index, b)
1299 }
1300
1301 private fun updateCellExpandBufferSlow(
1302 /* The working cell is specified by
1303 the segment and the index in it. */
1304 segment: ChannelSegment<E>,
1305 index: Int,
1306 /* The global index of the cell. */
1307 b: Long
1308 ): Boolean {
1309 // Update the cell state according to its state machine.
1310 // See the paper mentioned in the very beginning for
1311 // the cell life-cycle and the algorithm details.
1312 while (true) {
1313 // Read the current cell state.
1314 val state = segment.getState(index)
1315 when {
1316 // A suspended waiter, sender or receiver.
1317 state is Waiter -> {
1318 // Usually, a sender is stored in the cell.
1319 // However, it is possible for a concurrent
1320 // receiver to be already suspended there.
1321 // Try to distinguish whether the waiter is a
1322 // sender by comparing the global cell index with
1323 // the `receivers` counter. In case the cell is not
1324 // covered by a receiver, a sender is stored in the cell.
1325 if (b < receivers.value) {
1326 // The algorithm cannot distinguish whether the
1327 // suspended in the cell operation is sender or receiver.
1328 // To make progress, `expandBuffer()` delegates its completion
1329 // to an upcoming pairwise request, atomically wrapping
1330 // the waiter in `WaiterEB`. In case a sender is stored
1331 // in the cell, the upcoming receiver will call `expandBuffer()`
1332 // if the sender resumption fails; thus, effectively, skipping
1333 // this cell. Otherwise, if a receiver is stored in the cell,
1334 // this `expandBuffer()` procedure must finish; therefore,
1335 // sender ignore the `WaiterEB` wrapper.
1336 if (segment.casState(index, state, WaiterEB(waiter = state)))
1337 return true
1338 } else {
1339 // The cell stores a suspended sender. Try to resume it.
1340 // To synchronize with a concurrent `receive()`, the algorithm
1341 // first moves the cell state to an intermediate `RESUMING_BY_EB`
1342 // state, updating it to either `BUFFERED` (on successful resumption)
1343 // or `INTERRUPTED_SEND` (on failure).
1344 if (segment.casState(index, state, RESUMING_BY_EB)) {
1345 return if (state.tryResumeSender(segment, index)) {
1346 // The sender has been resumed successfully!
1347 // Move the cell to the logical buffer and finish.
1348 segment.setState(index, BUFFERED)
1349 true
1350 } else {
1351 // The resumption has failed.
1352 segment.setState(index, INTERRUPTED_SEND)
1353 segment.onCancelledRequest(index, false)
1354 false
1355 }
1356 }
1357 }
1358 }
1359 // The cell stores an interrupted sender, skip it.
1360 state === INTERRUPTED_SEND -> return false
1361 // The cell is empty, a concurrent sender is coming.
1362 state === null -> {
1363 // To inform a concurrent sender that this cell is
1364 // already a part of the buffer, the algorithm moves
1365 // it to a special `IN_BUFFER` state.
1366 if (segment.casState(index, state, IN_BUFFER)) return true
1367 }
1368 // The cell is already a part of the buffer, finish.
1369 state === BUFFERED -> return true
1370 // The cell is already processed by a receiver, no further action is required.
1371 state === POISONED || state === DONE_RCV || state === INTERRUPTED_RCV -> return true
1372 // The channel is closed, all the following
1373 // cells are already in the same state, finish.
1374 state === CHANNEL_CLOSED -> return true
1375 // A concurrent receiver is resuming the suspended sender.
1376 // Wait in a spin-loop until it changes the cell state
1377 // to either `DONE_RCV` or `INTERRUPTED_SEND`.
1378 state === RESUMING_BY_RCV -> continue // spin wait
1379 else -> error("Unexpected cell state: $state")
1380 }
1381 }
1382 }
1383
1384 /**
1385 * Increments the counter of completed [expandBuffer] invocations.
1386 * To guarantee starvation-freedom for [waitExpandBufferCompletion],
1387 * which waits until the counters of started and completed [expandBuffer] calls
1388 * coincide and become greater or equal to the specified value,
1389 * [waitExpandBufferCompletion] may set a flag that pauses further progress.
1390 */
1391 private fun incCompletedExpandBufferAttempts(nAttempts: Long = 1) {
1392 // Increment the number of completed `expandBuffer()` calls.
1393 completedExpandBuffersAndPauseFlag.addAndGet(nAttempts).also {
1394 // Should further `expandBuffer()`-s be paused?
1395 // If so, this thread should wait in a spin-loop
1396 // until the flag is unset.
1397 if (it.ebPauseExpandBuffers) {
1398 @Suppress("ControlFlowWithEmptyBody")
1399 while (completedExpandBuffersAndPauseFlag.value.ebPauseExpandBuffers) {}
1400 }
1401 }
1402 }
1403
1404 /**
1405 * Waits in a spin-loop until the [expandBuffer] call that
1406 * should process the [globalIndex]-th cell is completed.
1407 * Essentially, it waits until the numbers of started ([bufferEnd])
1408 * and completed ([completedExpandBuffersAndPauseFlag]) [expandBuffer]
1409 * attempts coincide and become equal or greater than [globalIndex].
1410 * To avoid starvation, this function may set a flag
1411 * that pauses further progress.
1412 */
1413 internal fun waitExpandBufferCompletion(globalIndex: Long) {
1414 // Do nothing if this channel is rendezvous or unlimited;
1415 // `expandBuffer()` is not used in these cases.
1416 if (isRendezvousOrUnlimited) return
1417 // Wait in an infinite loop until the number of started
1418 // buffer expansion calls become not lower than the cell index.
1419 @Suppress("ControlFlowWithEmptyBody")
1420 while (bufferEndCounter <= globalIndex) {}
1421 // Now it is guaranteed that the `expandBuffer()` call that
1422 // should process the required cell has been started.
1423 // Wait in a fixed-size spin-loop until the numbers of
1424 // started and completed buffer expansion calls coincide.
1425 repeat(EXPAND_BUFFER_COMPLETION_WAIT_ITERATIONS) {
1426 // Read the number of started buffer expansion calls.
1427 val b = bufferEndCounter
1428 // Read the number of completed buffer expansion calls.
1429 val ebCompleted = completedExpandBuffersAndPauseFlag.value.ebCompletedCounter
1430 // Do the numbers of started and completed calls coincide?
1431 // Note that we need to re-read the number of started `expandBuffer()`
1432 // calls to obtain a correct snapshot.
1433 // Here we wait to a precise match in order to ensure that **our matching expandBuffer()**
1434 // completed. The only way to ensure that is to check that number of started expands == number of finished expands
1435 if (b == ebCompleted && b == bufferEndCounter) return
1436 }
1437 // To avoid starvation, pause further `expandBuffer()` calls.
1438 completedExpandBuffersAndPauseFlag.update {
1439 constructEBCompletedAndPauseFlag(it.ebCompletedCounter, true)
1440 }
1441 // Now wait in an infinite spin-loop until the counters coincide.
1442 while (true) {
1443 // Read the number of started buffer expansion calls.
1444 val b = bufferEndCounter
1445 // Read the number of completed buffer expansion calls
1446 // along with the flag that pauses further progress.
1447 val ebCompletedAndBit = completedExpandBuffersAndPauseFlag.value
1448 val ebCompleted = ebCompletedAndBit.ebCompletedCounter
1449 val pauseExpandBuffers = ebCompletedAndBit.ebPauseExpandBuffers
1450 // Do the numbers of started and completed calls coincide?
1451 // Note that we need to re-read the number of started `expandBuffer()`
1452 // calls to obtain a correct snapshot.
1453 if (b == ebCompleted && b == bufferEndCounter) {
1454 // Unset the flag, which pauses progress, and finish.
1455 completedExpandBuffersAndPauseFlag.update {
1456 constructEBCompletedAndPauseFlag(it.ebCompletedCounter, false)
1457 }
1458 return
1459 }
1460 // It is possible that a concurrent caller of this function
1461 // has unset the flag, which pauses further progress to avoid
1462 // starvation. In this case, set the flag back.
1463 if (!pauseExpandBuffers) {
1464 completedExpandBuffersAndPauseFlag.compareAndSet(
1465 ebCompletedAndBit,
1466 constructEBCompletedAndPauseFlag(ebCompleted, true)
1467 )
1468 }
1469 }
1470 }
1471
1472
1473 // #######################
1474 // ## Select Expression ##
1475 // #######################
1476
1477 @Suppress("UNCHECKED_CAST")
1478 override val onSend: SelectClause2<E, BufferedChannel<E>>
1479 get() = SelectClause2Impl(
1480 clauseObject = this@BufferedChannel,
1481 regFunc = BufferedChannel<*>::registerSelectForSend as RegistrationFunction,
1482 processResFunc = BufferedChannel<*>::processResultSelectSend as ProcessResultFunction
1483 )
1484
1485 @Suppress("UNCHECKED_CAST")
1486 protected open fun registerSelectForSend(select: SelectInstance<*>, element: Any?) =
1487 sendImpl( // <-- this is an inline function
1488 element = element as E,
1489 waiter = select,
1490 onRendezvousOrBuffered = { select.selectInRegistrationPhase(Unit) },
1491 onSuspend = { _, _ -> },
1492 onClosed = { onClosedSelectOnSend(element, select) }
1493 )
1494
1495
1496 private fun onClosedSelectOnSend(element: E, select: SelectInstance<*>) {
1497 onUndeliveredElement?.callUndeliveredElement(element, select.context)
1498 select.selectInRegistrationPhase(CHANNEL_CLOSED)
1499 }
1500
1501 @Suppress("UNUSED_PARAMETER", "RedundantNullableReturnType")
1502 private fun processResultSelectSend(ignoredParam: Any?, selectResult: Any?): Any? =
1503 if (selectResult === CHANNEL_CLOSED) throw sendException
1504 else this
1505
1506 @Suppress("UNCHECKED_CAST")
1507 override val onReceive: SelectClause1<E>
1508 get() = SelectClause1Impl(
1509 clauseObject = this@BufferedChannel,
1510 regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1511 processResFunc = BufferedChannel<*>::processResultSelectReceive as ProcessResultFunction,
1512 onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1513 )
1514
1515 @Suppress("UNCHECKED_CAST")
1516 override val onReceiveCatching: SelectClause1<ChannelResult<E>>
1517 get() = SelectClause1Impl(
1518 clauseObject = this@BufferedChannel,
1519 regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1520 processResFunc = BufferedChannel<*>::processResultSelectReceiveCatching as ProcessResultFunction,
1521 onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1522 )
1523
1524 @Suppress("OVERRIDE_DEPRECATION", "UNCHECKED_CAST")
1525 override val onReceiveOrNull: SelectClause1<E?>
1526 get() = SelectClause1Impl(
1527 clauseObject = this@BufferedChannel,
1528 regFunc = BufferedChannel<*>::registerSelectForReceive as RegistrationFunction,
1529 processResFunc = BufferedChannel<*>::processResultSelectReceiveOrNull as ProcessResultFunction,
1530 onCancellationConstructor = onUndeliveredElementReceiveCancellationConstructor
1531 )
1532
1533 @Suppress("UNUSED_PARAMETER")
1534 private fun registerSelectForReceive(select: SelectInstance<*>, ignoredParam: Any?) =
1535 receiveImpl( // <-- this is an inline function
1536 waiter = select,
1537 onElementRetrieved = { elem -> select.selectInRegistrationPhase(elem) },
1538 onSuspend = { _, _, _ -> },
1539 onClosed = { onClosedSelectOnReceive(select) }
1540 )
1541
1542 private fun onClosedSelectOnReceive(select: SelectInstance<*>) {
1543 select.selectInRegistrationPhase(CHANNEL_CLOSED)
1544 }
1545
1546 @Suppress("UNUSED_PARAMETER")
1547 private fun processResultSelectReceive(ignoredParam: Any?, selectResult: Any?): Any? =
1548 if (selectResult === CHANNEL_CLOSED) throw receiveException
1549 else selectResult
1550
1551 @Suppress("UNUSED_PARAMETER")
1552 private fun processResultSelectReceiveOrNull(ignoredParam: Any?, selectResult: Any?): Any? =
1553 if (selectResult === CHANNEL_CLOSED) {
1554 if (closeCause == null) null
1555 else throw receiveException
1556 } else selectResult
1557
1558 @Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER", "RedundantNullableReturnType")
1559 private fun processResultSelectReceiveCatching(ignoredParam: Any?, selectResult: Any?): Any? =
1560 if (selectResult === CHANNEL_CLOSED) closed(closeCause)
1561 else success(selectResult as E)
1562
1563 @Suppress("UNCHECKED_CAST")
1564 private val onUndeliveredElementReceiveCancellationConstructor: OnCancellationConstructor? = onUndeliveredElement?.let {
1565 { select: SelectInstance<*>, _: Any?, element: Any? ->
1566 { if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(element as E, select.context) }
1567 }
1568 }
1569
1570 // ######################
1571 // ## Iterator Support ##
1572 // ######################
1573
1574 override fun iterator(): ChannelIterator<E> = BufferedChannelIterator()
1575
1576 /**
1577 * The key idea is that an iterator is a special receiver type,
1578 * which should be resumed differently to [receive] and [onReceive]
1579 * operations, but can be served as a waiter in a way similar to
1580 * [CancellableContinuation] and [SelectInstance].
1581 *
1582 * Roughly, [hasNext] is a [receive] sibling, while [next] simply
1583 * returns the already retrieved element. From the implementation
1584 * side, [receiveResult] stores the element retrieved by [hasNext]
1585 * (or a special [CHANNEL_CLOSED] token if the channel is closed).
1586 *
1587 * The [invoke] function is a [CancelHandler] implementation,
1588 * which requires knowing the [segment] and the [index] in it
1589 * that specify the location of the stored iterator.
1590 *
1591 * To resume the suspended [hasNext] call, a special [tryResumeHasNext]
1592 * function should be used in a way similar to [CancellableContinuation.tryResume]
1593 * and [SelectInstance.trySelect]. When the channel becomes closed,
1594 * [tryResumeHasNextOnClosedChannel] should be used instead.
1595 */
1596 private inner class BufferedChannelIterator : ChannelIterator<E>, Waiter {
1597 /**
1598 * Stores the element retrieved by [hasNext] or
1599 * a special [CHANNEL_CLOSED] token if this channel is closed.
1600 * If [hasNext] has not been invoked yet, [NO_RECEIVE_RESULT] is stored.
1601 */
1602 private var receiveResult: Any? = NO_RECEIVE_RESULT
1603
1604 /**
1605 * When [hasNext] suspends, this field stores the corresponding
1606 * continuation. The [tryResumeHasNext] and [tryResumeHasNextOnClosedChannel]
1607 * function resume this continuation when the [hasNext] invocation should complete.
1608 *
1609 * This property is the subject to bening data race:
1610 * It is nulled-out on both completion and cancellation paths that
1611 * could happen concurrently.
1612 */
1613 @BenignDataRace
1614 private var continuation: CancellableContinuationImpl<Boolean>? = null
1615
1616 // `hasNext()` is just a special receive operation.
1617 override suspend fun hasNext(): Boolean =
1618 receiveImpl( // <-- this is an inline function
1619 // Do not create a continuation until it is required;
1620 // it is created later via [onNoWaiterSuspend], if needed.
1621 waiter = null,
1622 // Store the received element in `receiveResult` on successful
1623 // retrieval from the buffer or rendezvous with a suspended sender.
1624 // Also, inform the `BufferedChannel` extensions that
1625 // the synchronization of this receive operation is completed.
1626 onElementRetrieved = { element ->
1627 this.receiveResult = element
1628 true
1629 },
1630 // As no waiter is provided, suspension is impossible.
1631 onSuspend = { _, _, _ -> error("unreachable") },
1632 // Return `false` or throw an exception if the channel is already closed.
1633 onClosed = { onClosedHasNext() },
1634 // If `hasNext()` decides to suspend, the corresponding
1635 // `suspend` function that creates a continuation is called.
1636 // The tail-call optimization is applied here.
1637 onNoWaiterSuspend = { segm, i, r -> return hasNextOnNoWaiterSuspend(segm, i, r) }
1638 )
1639
1640 private fun onClosedHasNext(): Boolean {
1641 this.receiveResult = CHANNEL_CLOSED
1642 val cause = closeCause ?: return false
1643 throw recoverStackTrace(cause)
1644 }
1645
1646 private suspend fun hasNextOnNoWaiterSuspend(
1647 /* The working cell is specified by
1648 the segment and the index in it. */
1649 segment: ChannelSegment<E>,
1650 index: Int,
1651 /* The global index of the cell. */
1652 r: Long
1653 ): Boolean = suspendCancellableCoroutineReusable { cont ->
1654 this.continuation = cont
1655 receiveImplOnNoWaiter( // <-- this is an inline function
1656 segment = segment, index = index, r = r,
1657 waiter = this, // store this iterator as a waiter
1658 // In case of successful element retrieval, store
1659 // it in `receiveResult` and resume the continuation.
1660 // Importantly, the receiver coroutine may be cancelled
1661 // after it is successfully resumed but not dispatched yet.
1662 // In case `onUndeliveredElement` is present, we must
1663 // invoke it in the latter case.
1664 onElementRetrieved = { element ->
1665 this.receiveResult = element
1666 this.continuation = null
1667 cont.resume(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
1668 },
1669 onClosed = { onClosedHasNextNoWaiterSuspend() }
1670 )
1671 }
1672
1673 override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
1674 this.continuation?.invokeOnCancellation(segment, index)
1675 }
1676
1677 private fun onClosedHasNextNoWaiterSuspend() {
1678 // Read the current continuation and clean
1679 // the corresponding field to avoid memory leaks.
1680 val cont = this.continuation!!
1681 this.continuation = null
1682 // Update the `hasNext()` internal result.
1683 this.receiveResult = CHANNEL_CLOSED
1684 // If this channel was closed without exception,
1685 // `hasNext()` should return `false`; otherwise,
1686 // it throws the closing exception.
1687 val cause = closeCause
1688 if (cause == null) {
1689 cont.resume(false)
1690 } else {
1691 cont.resumeWithException(recoverStackTrace(cause, cont))
1692 }
1693 }
1694
1695 @Suppress("UNCHECKED_CAST")
1696 override fun next(): E {
1697 // Read the already received result, or [NO_RECEIVE_RESULT] if [hasNext] has not been invoked yet.
1698 val result = receiveResult
1699 check(result !== NO_RECEIVE_RESULT) { "`hasNext()` has not been invoked" }
1700 receiveResult = NO_RECEIVE_RESULT
1701 // Is this channel closed?
1702 if (result === CHANNEL_CLOSED) throw recoverStackTrace(receiveException)
1703 // Return the element.
1704 return result as E
1705 }
1706
1707 fun tryResumeHasNext(element: E): Boolean {
1708 // Read the current continuation and clean
1709 // the corresponding field to avoid memory leaks.
1710 val cont = this.continuation!!
1711 this.continuation = null
1712 // Store the retrieved element in `receiveResult`.
1713 this.receiveResult = element
1714 // Try to resume this `hasNext()`. Importantly, the receiver coroutine
1715 // may be cancelled after it is successfully resumed but not dispatched yet.
1716 // In case `onUndeliveredElement` is specified, we need to invoke it in the latter case.
1717 return cont.tryResume0(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
1718 }
1719
1720 fun tryResumeHasNextOnClosedChannel() {
1721 /*
1722 * Read the current continuation of the suspended `hasNext()` call and clean the corresponding field to avoid memory leaks.
1723 * While this nulling out is unnecessary, it eliminates memory leaks (through the continuation)
1724 * if the channel iterator accidentally remains GC-reachable after the channel is closed.
1725 */
1726 val cont = this.continuation!!
1727 this.continuation = null
1728 // Update the `hasNext()` internal result and inform
1729 // `BufferedChannel` extensions that synchronization
1730 // of this receive operation is completed.
1731 this.receiveResult = CHANNEL_CLOSED
1732 // If this channel was closed without exception,
1733 // `hasNext()` should return `false`; otherwise,
1734 // it throws the closing exception.
1735 val cause = closeCause
1736 if (cause == null) {
1737 cont.resume(false)
1738 } else {
1739 cont.resumeWithException(recoverStackTrace(cause, cont))
1740 }
1741 }
1742 }
1743
1744 // ##############################
1745 // ## Closing and Cancellation ##
1746 // ##############################
1747
1748 /**
1749 * Store the cause of closing this channel, either via [close] or [cancel] call.
1750 * The closing cause can be set only once.
1751 */
1752 private val _closeCause = atomic<Any?>(NO_CLOSE_CAUSE)
1753 // Should be called only if this channel is closed or cancelled.
1754 protected val closeCause get() = _closeCause.value as Throwable?
1755
1756 /** Returns the closing cause if it is non-null, or [ClosedSendChannelException] otherwise. */
1757 protected val sendException get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
1758
1759 /** Returns the closing cause if it is non-null, or [ClosedReceiveChannelException] otherwise. */
1760 private val receiveException get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1761
1762 /**
1763 Stores the closed handler installed by [invokeOnClose].
1764 To synchronize [invokeOnClose] and [close], two additional
1765 marker states, [CLOSE_HANDLER_INVOKED] and [CLOSE_HANDLER_CLOSED]
1766 are used. The resulting state diagram is presented below.
1767
1768 +------+ install handler +---------+ close(..) +---------+
1769 | null |------------------>| handler |------------>| INVOKED |
1770 +------+ +---------+ +---------+
1771 |
1772 | close(..) +--------+
1773 +----------->| CLOSED |
1774 +--------+
1775 */
1776 private val closeHandler = atomic<Any?>(null)
1777
1778 /**
1779 * Invoked when channel is closed as the last action of [close] invocation.
1780 * This method should be idempotent and can be called multiple times.
1781 */
1782 protected open fun onClosedIdempotent() {}
1783
1784 override fun close(cause: Throwable?): Boolean =
1785 closeOrCancelImpl(cause, cancel = false)
1786
1787 @Suppress("OVERRIDE_DEPRECATION")
1788 final override fun cancel(cause: Throwable?): Boolean = cancelImpl(cause)
1789
1790 @Suppress("OVERRIDE_DEPRECATION")
1791 final override fun cancel() { cancelImpl(null) }
1792
1793 final override fun cancel(cause: CancellationException?) { cancelImpl(cause) }
1794
1795 internal open fun cancelImpl(cause: Throwable?): Boolean =
1796 closeOrCancelImpl(cause ?: CancellationException("Channel was cancelled"), cancel = true)
1797
1798 /**
1799 * This is a common implementation for [close] and [cancel]. It first tries
1800 * to install the specified cause; the invocation that successfully installs
1801 * the cause returns `true` as a results of this function, while all further
1802 * [close] and [cancel] calls return `false`.
1803 *
1804 * After the closing/cancellation cause is installed, the channel should be marked
1805 * as closed or cancelled, which bounds further `send(e)`-s to fails.
1806 *
1807 * Then, [completeCloseOrCancel] is called, which cancels waiting `receive()`
1808 * requests ([cancelSuspendedReceiveRequests]) and removes unprocessed elements
1809 * ([removeUnprocessedElements]) in case this channel is cancelled.
1810 *
1811 * Finally, if this [closeOrCancelImpl] has installed the cause, therefore,
1812 * has closed the channel, [closeHandler] and [onClosedIdempotent] should be invoked.
1813 */
1814 protected open fun closeOrCancelImpl(cause: Throwable?, cancel: Boolean): Boolean {
1815 // If this is a `cancel(..)` invocation, set a bit that the cancellation
1816 // has been started. This is crucial for ensuring linearizability,
1817 // when concurrent `close(..)` and `isClosedFor[Send,Receive]` operations
1818 // help this `cancel(..)`.
1819 if (cancel) markCancellationStarted()
1820 // Try to install the specified cause. On success, this invocation will
1821 // return `true` as a result; otherwise, it will complete with `false`.
1822 val closedByThisOperation = _closeCause.compareAndSet(NO_CLOSE_CAUSE, cause)
1823 // Mark this channel as closed or cancelled, depending on this operation type.
1824 if (cancel) markCancelled() else markClosed()
1825 // Complete the closing or cancellation procedure.
1826 completeCloseOrCancel()
1827 // Finally, if this operation has installed the cause,
1828 // it should invoke the close handlers.
1829 return closedByThisOperation.also {
1830 onClosedIdempotent()
1831 if (it) invokeCloseHandler()
1832 }
1833 }
1834
1835 /**
1836 * Invokes the installed close handler,
1837 * updating the [closeHandler] state correspondingly.
1838 */
1839 private fun invokeCloseHandler() {
1840 val closeHandler = closeHandler.getAndUpdate {
1841 if (it === null) {
1842 // Inform concurrent `invokeOnClose`
1843 // that this channel is already closed.
1844 CLOSE_HANDLER_CLOSED
1845 } else {
1846 // Replace the handler with a special
1847 // `INVOKED` marker to avoid memory leaks.
1848 CLOSE_HANDLER_INVOKED
1849 }
1850 } ?: return // no handler was installed, finish.
1851 // Invoke the handler.
1852 @Suppress("UNCHECKED_CAST")
1853 closeHandler as (cause: Throwable?) -> Unit
1854 closeHandler(closeCause)
1855 }
1856
1857 override fun invokeOnClose(handler: (cause: Throwable?) -> Unit) {
1858 // Try to install the handler, finishing on success.
1859 if (closeHandler.compareAndSet(null, handler)) {
1860 // Handler has been successfully set, finish the operation.
1861 return
1862 }
1863 // Either another handler is already set, or this channel is closed.
1864 // In the latter case, the current handler should be invoked.
1865 // However, the implementation must ensure that at most one
1866 // handler is called, throwing an `IllegalStateException`
1867 // if another close handler has been invoked.
1868 closeHandler.loop { cur ->
1869 when {
1870 cur === CLOSE_HANDLER_CLOSED -> {
1871 // Try to update the state from `CLOSED` to `INVOKED`.
1872 // This is crucial to guarantee that at most one handler can be called.
1873 // On success, invoke the handler and finish.
1874 if (closeHandler.compareAndSet(CLOSE_HANDLER_CLOSED, CLOSE_HANDLER_INVOKED)) {
1875 handler(closeCause)
1876 return
1877 }
1878 }
1879 cur === CLOSE_HANDLER_INVOKED -> error("Another handler was already registered and successfully invoked")
1880 else -> error("Another handler is already registered: $cur")
1881 }
1882 }
1883 }
1884
1885 /**
1886 * Marks this channel as closed.
1887 * In case [cancelImpl] has already been invoked,
1888 * and this channel is marked with [CLOSE_STATUS_CANCELLATION_STARTED],
1889 * this function marks the channel as cancelled.
1890 *
1891 * All operation that notice this channel in the closed state,
1892 * must help to complete the closing via [completeCloseOrCancel].
1893 */
1894 private fun markClosed(): Unit =
1895 sendersAndCloseStatus.update { cur ->
1896 when (cur.sendersCloseStatus) {
1897 CLOSE_STATUS_ACTIVE -> // the channel is neither closed nor cancelled
1898 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CLOSED)
1899 CLOSE_STATUS_CANCELLATION_STARTED -> // the channel is going to be cancelled
1900 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLED)
1901 else -> return // the channel is already marked as closed or cancelled.
1902 }
1903 }
1904
1905 /**
1906 * Marks this channel as cancelled.
1907 *
1908 * All operation that notice this channel in the cancelled state,
1909 * must help to complete the cancellation via [completeCloseOrCancel].
1910 */
1911 private fun markCancelled(): Unit =
1912 sendersAndCloseStatus.update { cur ->
1913 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLED)
1914 }
1915
1916 /**
1917 * When the cancellation procedure starts, it is critical
1918 * to mark the closing status correspondingly. Thus, other
1919 * operations, which may help to complete the cancellation,
1920 * always correctly update the status to `CANCELLED`.
1921 */
1922 private fun markCancellationStarted(): Unit =
1923 sendersAndCloseStatus.update { cur ->
1924 if (cur.sendersCloseStatus == CLOSE_STATUS_ACTIVE)
1925 constructSendersAndCloseStatus(cur.sendersCounter, CLOSE_STATUS_CANCELLATION_STARTED)
1926 else return // this channel is already closed or cancelled
1927 }
1928
1929 /**
1930 * Completes the started [close] or [cancel] procedure.
1931 */
1932 private fun completeCloseOrCancel() {
1933 isClosedForSend // must finish the started close/cancel if one is detected.
1934 }
1935
1936 protected open val isConflatedDropOldest get() = false
1937
1938 /**
1939 * Completes the channel closing procedure.
1940 */
1941 private fun completeClose(sendersCur: Long): ChannelSegment<E> {
1942 // Close the linked list for further segment addition,
1943 // obtaining the last segment in the data structure.
1944 val lastSegment = closeLinkedList()
1945 // In the conflated channel implementation (with the DROP_OLDEST
1946 // elements conflation strategy), it is critical to mark all empty
1947 // cells as closed to prevent in-progress `send(e)`-s, which have not
1948 // put their elements yet, completions after this channel is closed.
1949 // Otherwise, it is possible for a `send(e)` to put an element when
1950 // the buffer is already full, while a concurrent receiver may extract
1951 // the oldest element. When the channel is not closed, we can linearize
1952 // this `receive()` before the `send(e)`, but after the channel is closed,
1953 // `send(e)` must fails. Marking all unprocessed cells as `CLOSED` solves the issue.
1954 if (isConflatedDropOldest) {
1955 val lastBufferedCellGlobalIndex = markAllEmptyCellsAsClosed(lastSegment)
1956 if (lastBufferedCellGlobalIndex != -1L)
1957 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(lastBufferedCellGlobalIndex)
1958 }
1959 // Resume waiting `receive()` requests,
1960 // informing them that the channel is closed.
1961 cancelSuspendedReceiveRequests(lastSegment, sendersCur)
1962 // Return the last segment in the linked list as a result
1963 // of this function; we need it in `completeCancel(..)`.
1964 return lastSegment
1965 }
1966
1967 /**
1968 * Completes the channel cancellation procedure.
1969 */
1970 private fun completeCancel(sendersCur: Long) {
1971 // First, ensure that this channel is closed,
1972 // obtaining the last segment in the linked list.
1973 val lastSegment = completeClose(sendersCur)
1974 // Cancel suspended `send(e)` requests and
1975 // remove buffered elements in the reverse order.
1976 removeUnprocessedElements(lastSegment)
1977 }
1978
1979 /**
1980 * Closes the underlying linked list of segments for further segment addition.
1981 */
1982 private fun closeLinkedList(): ChannelSegment<E> {
1983 // Choose the last segment.
1984 var lastSegment = bufferEndSegment.value
1985 sendSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
1986 receiveSegment.value.let { if (it.id > lastSegment.id) lastSegment = it }
1987 // Close the linked list of segment for new segment addition
1988 // and return the last segment in the linked list.
1989 return lastSegment.close()
1990 }
1991
1992 /**
1993 * This function marks all empty cells, in the `null` and [IN_BUFFER] state,
1994 * as closed. Notably, it processes the cells from right to left, and finishes
1995 * immediately when the processing cell is already covered by `receive()` or
1996 * contains a buffered elements ([BUFFERED] state).
1997 *
1998 * This function returns the global index of the last buffered element,
1999 * or `-1` if this channel does not contain buffered elements.
2000 */
2001 private fun markAllEmptyCellsAsClosed(lastSegment: ChannelSegment<E>): Long {
2002 // Process the cells in reverse order, from right to left.
2003 var segment = lastSegment
2004 while (true) {
2005 for (index in SEGMENT_SIZE - 1 downTo 0) {
2006 // Is this cell already covered by `receive()`?
2007 val globalIndex = segment.id * SEGMENT_SIZE + index
2008 if (globalIndex < receiversCounter) return -1
2009 // Process the cell `segment[index]`.
2010 cell_update@ while (true) {
2011 val state = segment.getState(index)
2012 when {
2013 // The cell is empty.
2014 state === null || state === IN_BUFFER -> {
2015 // Inform a possibly upcoming sender that this channel is already closed.
2016 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2017 segment.onSlotCleaned()
2018 break@cell_update
2019 }
2020 }
2021 // The cell stores a buffered element.
2022 state === BUFFERED -> return globalIndex
2023 // Skip this cell if it is not empty and does not store a buffered element.
2024 else -> break@cell_update
2025 }
2026 }
2027 }
2028 // Process the next segment, finishing if the linked list ends.
2029 segment = segment.prev ?: return -1
2030 }
2031 }
2032
2033 /**
2034 * Cancels suspended `send(e)` requests and removes buffered elements
2035 * starting from the last cell in the specified [lastSegment] (it must
2036 * be the physical tail of the underlying linked list) and updating
2037 * the cells in reverse order.
2038 */
2039 private fun removeUnprocessedElements(lastSegment: ChannelSegment<E>) {
2040 // Read the `onUndeliveredElement` lambda at once. In case it
2041 // throws an exception, this exception is handled and stored in
2042 // the variable below. If multiple exceptions are thrown, the first
2043 // one is stored in the variable, while the others are suppressed.
2044 val onUndeliveredElement = onUndeliveredElement
2045 var undeliveredElementException: UndeliveredElementException? = null // first cancel exception, others suppressed
2046 // To perform synchronization correctly, it is critical to
2047 // process the cells in reverse order, from right to left.
2048 // However, according to the API, suspended senders should
2049 // be cancelled in the order of their suspension. Therefore,
2050 // we need to collect all of them and cancel in the reverse
2051 // order after that.
2052 var suspendedSenders = InlineList<Waiter>()
2053 var segment = lastSegment
2054 process_segments@ while (true) {
2055 for (index in SEGMENT_SIZE - 1 downTo 0) {
2056 // Process the cell `segment[index]`.
2057 val globalIndex = segment.id * SEGMENT_SIZE + index
2058 // Update the cell state.
2059 update_cell@ while (true) {
2060 // Read the current state of the cell.
2061 val state = segment.getState(index)
2062 when {
2063 // The cell is already processed by a receiver.
2064 state === DONE_RCV -> break@process_segments
2065 // The cell stores a buffered element.
2066 state === BUFFERED -> {
2067 // Is the cell already covered by a receiver?
2068 if (globalIndex < receiversCounter) break@process_segments
2069 // Update the cell state to `CHANNEL_CLOSED`.
2070 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2071 // If `onUndeliveredElement` lambda is non-null, call it.
2072 if (onUndeliveredElement != null) {
2073 val element = segment.getElement(index)
2074 undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(element, undeliveredElementException)
2075 }
2076 // Clean the element field and inform the segment
2077 // that the slot is cleaned to avoid memory leaks.
2078 segment.cleanElement(index)
2079 segment.onSlotCleaned()
2080 break@update_cell
2081 }
2082 }
2083 // The cell is empty.
2084 state === IN_BUFFER || state === null -> {
2085 // Update the cell state to `CHANNEL_CLOSED`.
2086 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2087 // Inform the segment that the slot is cleaned to avoid memory leaks.
2088 segment.onSlotCleaned()
2089 break@update_cell
2090 }
2091 }
2092 // The cell stores a suspended waiter.
2093 state is Waiter || state is WaiterEB -> {
2094 // Is the cell already covered by a receiver?
2095 if (globalIndex < receiversCounter) break@process_segments
2096 // Obtain the sender.
2097 val sender: Waiter = if (state is WaiterEB) state.waiter
2098 else state as Waiter
2099 // Update the cell state to `CHANNEL_CLOSED`.
2100 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2101 // If `onUndeliveredElement` lambda is non-null, call it.
2102 if (onUndeliveredElement != null) {
2103 val element = segment.getElement(index)
2104 undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(element, undeliveredElementException)
2105 }
2106 // Save the sender for further cancellation.
2107 suspendedSenders += sender
2108 // Clean the element field and inform the segment
2109 // that the slot is cleaned to avoid memory leaks.
2110 segment.cleanElement(index)
2111 segment.onSlotCleaned()
2112 break@update_cell
2113 }
2114 }
2115 // A concurrent receiver is resuming a suspended sender.
2116 // As the cell is covered by a receiver, finish immediately.
2117 state === RESUMING_BY_EB || state === RESUMING_BY_RCV -> break@process_segments
2118 // A concurrent `expandBuffer()` is resuming a suspended sender.
2119 // Wait in a spin-loop until the cell state changes.
2120 state === RESUMING_BY_EB -> continue@update_cell
2121 else -> break@update_cell
2122 }
2123 }
2124 }
2125 // Process the previous segment.
2126 segment = segment.prev ?: break
2127 }
2128 // Cancel suspended senders in their order of addition to this channel.
2129 suspendedSenders.forEachReversed { it.resumeSenderOnCancelledChannel() }
2130 // Throw `UndeliveredElementException` at the end if there was one.
2131 undeliveredElementException?.let { throw it }
2132 }
2133
2134 /**
2135 * Cancels suspended `receive` requests from the end to the beginning,
2136 * also moving empty cells to the `CHANNEL_CLOSED` state.
2137 */
2138 private fun cancelSuspendedReceiveRequests(lastSegment: ChannelSegment<E>, sendersCounter: Long) {
2139 // To perform synchronization correctly, it is critical to
2140 // extract suspended requests in the reverse order,
2141 // from the end to the beginning.
2142 // However, according to the API, they should be cancelled
2143 // in the order of their suspension. Therefore, we need to
2144 // collect the suspended requests first, cancelling them
2145 // in the reverse order after that.
2146 var suspendedReceivers = InlineList<Waiter>()
2147 var segment: ChannelSegment<E>? = lastSegment
2148 process_segments@ while (segment != null) {
2149 for (index in SEGMENT_SIZE - 1 downTo 0) {
2150 // Is the cell already covered by a sender? Finish immediately in this case.
2151 if (segment.id * SEGMENT_SIZE + index < sendersCounter) break@process_segments
2152 // Try to move the cell state to `CHANNEL_CLOSED`.
2153 cell_update@ while (true) {
2154 val state = segment.getState(index)
2155 when {
2156 state === null || state === IN_BUFFER -> {
2157 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2158 segment.onSlotCleaned()
2159 break@cell_update
2160 }
2161 }
2162 state is WaiterEB -> {
2163 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2164 suspendedReceivers += state.waiter // save for cancellation.
2165 segment.onCancelledRequest(index = index, receiver = true)
2166 break@cell_update
2167 }
2168 }
2169 state is Waiter -> {
2170 if (segment.casState(index, state, CHANNEL_CLOSED)) {
2171 suspendedReceivers += state // save for cancellation.
2172 segment.onCancelledRequest(index = index, receiver = true)
2173 break@cell_update
2174 }
2175 }
2176 else -> break@cell_update // nothing to cancel.
2177 }
2178 }
2179 }
2180 // Process the previous segment.
2181 segment = segment.prev
2182 }
2183 // Cancel the suspended requests in their order of addition to this channel.
2184 suspendedReceivers.forEachReversed { it.resumeReceiverOnClosedChannel() }
2185 }
2186
2187 /**
2188 * Resumes this receiver because this channel is closed.
2189 * This function does not take any effect if the operation has already been resumed or cancelled.
2190 */
2191 private fun Waiter.resumeReceiverOnClosedChannel() = resumeWaiterOnClosedChannel(receiver = true)
2192
2193 /**
2194 * Resumes this sender because this channel is cancelled.
2195 * This function does not take any effect if the operation has already been resumed or cancelled.
2196 */
2197 private fun Waiter.resumeSenderOnCancelledChannel() = resumeWaiterOnClosedChannel(receiver = false)
2198
2199 private fun Waiter.resumeWaiterOnClosedChannel(receiver: Boolean) {
2200 when (this) {
2201 is SendBroadcast -> cont.resume(false)
2202 is CancellableContinuation<*> -> resumeWithException(if (receiver) receiveException else sendException)
2203 is ReceiveCatching<*> -> cont.resume(closed(closeCause))
2204 is BufferedChannel<*>.BufferedChannelIterator -> tryResumeHasNextOnClosedChannel()
2205 is SelectInstance<*> -> trySelect(this@BufferedChannel, CHANNEL_CLOSED)
2206 else -> error("Unexpected waiter: $this")
2207 }
2208 }
2209
2210 @ExperimentalCoroutinesApi
2211 override val isClosedForSend: Boolean
2212 get() = sendersAndCloseStatus.value.isClosedForSend0
2213
2214 private val Long.isClosedForSend0 get() =
2215 isClosed(this, isClosedForReceive = false)
2216
2217 @ExperimentalCoroutinesApi
2218 override val isClosedForReceive: Boolean
2219 get() = sendersAndCloseStatus.value.isClosedForReceive0
2220
2221 private val Long.isClosedForReceive0 get() =
2222 isClosed(this, isClosedForReceive = true)
2223
2224 private fun isClosed(
2225 sendersAndCloseStatusCur: Long,
2226 isClosedForReceive: Boolean
2227 ) = when (sendersAndCloseStatusCur.sendersCloseStatus) {
2228 // This channel is active and has not been closed.
2229 CLOSE_STATUS_ACTIVE -> false
2230 // The cancellation procedure has been started but
2231 // not linearized yet, so this channel should be
2232 // considered as active.
2233 CLOSE_STATUS_CANCELLATION_STARTED -> false
2234 // This channel has been successfully closed.
2235 // Help to complete the closing procedure to
2236 // guarantee linearizability, and return `true`
2237 // for senders or the flag whether there still
2238 // exist elements to retrieve for receivers.
2239 CLOSE_STATUS_CLOSED -> {
2240 completeClose(sendersAndCloseStatusCur.sendersCounter)
2241 // When `isClosedForReceive` is `false`, always return `true`.
2242 // Otherwise, it is possible that the channel is closed but
2243 // still has elements to retrieve.
2244 if (isClosedForReceive) !hasElements() else true
2245 }
2246 // This channel has been successfully cancelled.
2247 // Help to complete the cancellation procedure to
2248 // guarantee linearizability and return `true`.
2249 CLOSE_STATUS_CANCELLED -> {
2250 completeCancel(sendersAndCloseStatusCur.sendersCounter)
2251 true
2252 }
2253 else -> error("unexpected close status: ${sendersAndCloseStatusCur.sendersCloseStatus}")
2254 }
2255
2256 @ExperimentalCoroutinesApi
2257 override val isEmpty: Boolean get() {
2258 // This function should return `false` if
2259 // this channel is closed for `receive`.
2260 if (isClosedForReceive) return false
2261 // Does this channel has elements to retrieve?
2262 if (hasElements()) return false
2263 // This channel does not have elements to retrieve;
2264 // Check that it is still not closed for `receive`.
2265 return !isClosedForReceive
2266 }
2267
2268 /**
2269 * Checks whether this channel contains elements to retrieve.
2270 * Unfortunately, simply comparing the counters is insufficient,
2271 * as some cells can be in the `INTERRUPTED` state due to cancellation.
2272 * This function tries to find the first "alive" element,
2273 * updating the `receivers` counter to skip empty cells.
2274 *
2275 * The implementation is similar to `receive()`.
2276 */
2277 internal fun hasElements(): Boolean {
2278 while (true) {
2279 // Read the segment before obtaining the `receivers` counter value.
2280 var segment = receiveSegment.value
2281 // Obtains the `receivers` and `senders` counter values.
2282 val r = receiversCounter
2283 val s = sendersCounter
2284 // Is there a chance that this channel has elements?
2285 if (s <= r) return false // no elements
2286 // The `r`-th cell is covered by a sender; check whether it contains an element.
2287 // First, try to find the required segment if the initially
2288 // obtained segment (in the beginning of this function) has lower id.
2289 val id = r / SEGMENT_SIZE
2290 if (segment.id != id) {
2291 // Try to find the required segment.
2292 segment = findSegmentReceive(id, segment) ?:
2293 // The required segment has not been found. Either it has already
2294 // been removed, or the underlying linked list is already closed
2295 // for segment additions. In the latter case, the channel is closed
2296 // and does not contain elements, so this operation returns `false`.
2297 // Otherwise, if the required segment is removed, the operation restarts.
2298 if (receiveSegment.value.id < id) return false else continue
2299 }
2300 segment.cleanPrev() // all the previous segments are no longer needed.
2301 // Does the `r`-th cell contain waiting sender or buffered element?
2302 val i = (r % SEGMENT_SIZE).toInt()
2303 if (isCellNonEmpty(segment, i, r)) return true
2304 // The cell is empty. Update `receivers` counter and try again.
2305 receivers.compareAndSet(r, r + 1) // if this CAS fails, the counter has already been updated.
2306 }
2307 }
2308
2309 /**
2310 * Checks whether this cell contains a buffered element or a waiting sender,
2311 * returning `true` in this case. Otherwise, if this cell is empty
2312 * (due to waiter cancellation, cell poisoning, or channel closing),
2313 * this function returns `false`.
2314 *
2315 * Notably, this function must be called only if the cell is covered by a sender.
2316 */
2317 private fun isCellNonEmpty(
2318 segment: ChannelSegment<E>,
2319 index: Int,
2320 globalIndex: Long
2321 ): Boolean {
2322 // The logic is similar to `updateCellReceive` with the only difference
2323 // that this function neither changes the cell state nor retrieves the element.
2324 while (true) {
2325 // Read the current cell state.
2326 val state = segment.getState(index)
2327 when {
2328 // The cell is empty but a sender is coming.
2329 state === null || state === IN_BUFFER -> {
2330 // Poison the cell to ensure correctness.
2331 if (segment.casState(index, state, POISONED)) {
2332 // When the cell becomes poisoned, it is essentially
2333 // the same as storing an already cancelled receiver.
2334 // Thus, the `expandBuffer()` procedure should be invoked.
2335 expandBuffer()
2336 return false
2337 }
2338 }
2339 // The cell stores a buffered element.
2340 state === BUFFERED -> return true
2341 // The cell stores an interrupted sender.
2342 state === INTERRUPTED_SEND -> return false
2343 // This channel is already closed.
2344 state === CHANNEL_CLOSED -> return false
2345 // The cell is already processed
2346 // by a concurrent receiver.
2347 state === DONE_RCV -> return false
2348 // The cell is already poisoned
2349 // by a concurrent receiver.
2350 state === POISONED -> return false
2351 // A concurrent `expandBuffer()` is resuming
2352 // a suspended sender. This function is eligible
2353 // to linearize before the buffer expansion procedure.
2354 state === RESUMING_BY_EB -> return true
2355 // A concurrent receiver is resuming
2356 // a suspended sender. The element
2357 // is no longer available for retrieval.
2358 state === RESUMING_BY_RCV -> return false
2359 // The cell stores a suspended request.
2360 // However, it is possible that this request
2361 // is receiver if the cell is covered by both
2362 // send and receive operations.
2363 // In case the cell is already covered by
2364 // a receiver, the element is no longer
2365 // available for retrieval, and this function
2366 // return `false`. Otherwise, it is guaranteed
2367 // that the suspended request is sender, so
2368 // this function returns `true`.
2369 else -> return globalIndex == receiversCounter
2370 }
2371 }
2372 }
2373
2374 // #######################
2375 // # Segments Management #
2376 // #######################
2377
2378 /**
2379 * Finds the segment with the specified [id] starting by the [startFrom]
2380 * segment and following the [ChannelSegment.next] references. In case
2381 * the required segment has not been created yet, this function attempts
2382 * to add it to the underlying linked list. Finally, it updates [sendSegment]
2383 * to the found segment if its [ChannelSegment.id] is greater than the one
2384 * of the already stored segment.
2385 *
2386 * In case the requested segment is already removed, or if it should be allocated
2387 * but the linked list structure is closed for new segments addition, this function
2388 * returns `null`. The implementation also efficiently skips a sequence of removed
2389 * segments, updating the counter value in [sendersAndCloseStatus] correspondingly.
2390 */
2391 private fun findSegmentSend(id: Long, startFrom: ChannelSegment<E>): ChannelSegment<E>? {
2392 return sendSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2393 if (it.isClosed) {
2394 // The required segment has not been found and new segments
2395 // cannot be added, as the linked listed in already added.
2396 // This channel is already closed or cancelled; help to complete
2397 // the closing or cancellation procedure.
2398 completeCloseOrCancel()
2399 // Clean the `prev` reference of the provided segment
2400 // if all the previous cells are already covered by senders.
2401 // It is important to clean the `prev` reference only in
2402 // this case, as the closing/cancellation procedure may
2403 // need correct value to traverse the linked list from right to left.
2404 if (startFrom.id * SEGMENT_SIZE < receiversCounter) startFrom.cleanPrev()
2405 // As the required segment is not found and cannot be allocated, return `null`.
2406 null
2407 } else {
2408 // Get the found segment.
2409 val segment = it.segment
2410 // Is the required segment removed?
2411 if (segment.id > id) {
2412 // The required segment has been removed; `segment` is the first
2413 // segment with `id` not lower than the required one.
2414 // Skip the sequence of removed cells in O(1).
2415 updateSendersCounterIfLower(segment.id * SEGMENT_SIZE)
2416 // Clean the `prev` reference of the provided segment
2417 // if all the previous cells are already covered by senders.
2418 // It is important to clean the `prev` reference only in
2419 // this case, as the closing/cancellation procedure may
2420 // need correct value to traverse the linked list from right to left.
2421 if (segment.id * SEGMENT_SIZE < receiversCounter) segment.cleanPrev()
2422 // As the required segment is not found and cannot be allocated, return `null`.
2423 null
2424 } else {
2425 assert { segment.id == id }
2426 // The required segment has been found; return it!
2427 segment
2428 }
2429 }
2430 }
2431 }
2432
2433 /**
2434 * Finds the segment with the specified [id] starting by the [startFrom]
2435 * segment and following the [ChannelSegment.next] references. In case
2436 * the required segment has not been created yet, this function attempts
2437 * to add it to the underlying linked list. Finally, it updates [receiveSegment]
2438 * to the found segment if its [ChannelSegment.id] is greater than the one
2439 * of the already stored segment.
2440 *
2441 * In case the requested segment is already removed, or if it should be allocated
2442 * but the linked list structure is closed for new segments addition, this function
2443 * returns `null`. The implementation also efficiently skips a sequence of removed
2444 * segments, updating the [receivers] counter correspondingly.
2445 */
2446 private fun findSegmentReceive(id: Long, startFrom: ChannelSegment<E>): ChannelSegment<E>? =
2447 receiveSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2448 if (it.isClosed) {
2449 // The required segment has not been found and new segments
2450 // cannot be added, as the linked listed in already added.
2451 // This channel is already closed or cancelled; help to complete
2452 // the closing or cancellation procedure.
2453 completeCloseOrCancel()
2454 // Clean the `prev` reference of the provided segment
2455 // if all the previous cells are already covered by senders.
2456 // It is important to clean the `prev` reference only in
2457 // this case, as the closing/cancellation procedure may
2458 // need correct value to traverse the linked list from right to left.
2459 if (startFrom.id * SEGMENT_SIZE < sendersCounter) startFrom.cleanPrev()
2460 // As the required segment is not found and cannot be allocated, return `null`.
2461 null
2462 } else {
2463 // Get the found segment.
2464 val segment = it.segment
2465 // Advance the `bufferEnd` segment if required.
2466 if (!isRendezvousOrUnlimited && id <= bufferEndCounter / SEGMENT_SIZE) {
2467 bufferEndSegment.moveForward(segment)
2468 }
2469 // Is the required segment removed?
2470 if (segment.id > id) {
2471 // The required segment has been removed; `segment` is the first
2472 // segment with `id` not lower than the required one.
2473 // Skip the sequence of removed cells in O(1).
2474 updateReceiversCounterIfLower(segment.id * SEGMENT_SIZE)
2475 // Clean the `prev` reference of the provided segment
2476 // if all the previous cells are already covered by senders.
2477 // It is important to clean the `prev` reference only in
2478 // this case, as the closing/cancellation procedure may
2479 // need correct value to traverse the linked list from right to left.
2480 if (segment.id * SEGMENT_SIZE < sendersCounter) segment.cleanPrev()
2481 // As the required segment is already removed, return `null`.
2482 null
2483 } else {
2484 assert { segment.id == id }
2485 // The required segment has been found; return it!
2486 segment
2487 }
2488 }
2489 }
2490
2491 /**
2492 * Importantly, when this function does not find the requested segment,
2493 * it always updates the number of completed `expandBuffer()` attempts.
2494 */
2495 private fun findSegmentBufferEnd(id: Long, startFrom: ChannelSegment<E>, currentBufferEndCounter: Long): ChannelSegment<E>? =
2496 bufferEndSegment.findSegmentAndMoveForward(id, startFrom, createSegmentFunction()).let {
2497 if (it.isClosed) {
2498 // The required segment has not been found and new segments
2499 // cannot be added, as the linked listed in already added.
2500 // This channel is already closed or cancelled; help to complete
2501 // the closing or cancellation procedure.
2502 completeCloseOrCancel()
2503 // Update `bufferEndSegment` to the last segment
2504 // in the linked list to avoid memory leaks.
2505 moveSegmentBufferEndToSpecifiedOrLast(id, startFrom)
2506 // When this function does not find the requested segment,
2507 // it should update the number of completed `expandBuffer()` attempts.
2508 incCompletedExpandBufferAttempts()
2509 null
2510 } else {
2511 // Get the found segment.
2512 val segment = it.segment
2513 // Is the required segment removed?
2514 if (segment.id > id) {
2515 // The required segment has been removed; `segment` is the first segment
2516 // with `id` not lower than the required one.
2517 // Try to skip the sequence of removed cells in O(1) by increasing the `bufferEnd` counter.
2518 // Importantly, when this function does not find the requested segment,
2519 // it should update the number of completed `expandBuffer()` attempts.
2520 if (bufferEnd.compareAndSet(currentBufferEndCounter + 1, segment.id * SEGMENT_SIZE)) {
2521 incCompletedExpandBufferAttempts(segment.id * SEGMENT_SIZE - currentBufferEndCounter)
2522 } else {
2523 incCompletedExpandBufferAttempts()
2524 }
2525 // As the required segment is already removed, return `null`.
2526 null
2527 } else {
2528 assert { segment.id == id }
2529 // The required segment has been found; return it!
2530 segment
2531 }
2532 }
2533 }
2534
2535 /**
2536 * Updates [bufferEndSegment] to the one with the specified [id] or
2537 * to the last existing segment, if the required segment is not yet created.
2538 *
2539 * Unlike [findSegmentBufferEnd], this function does not allocate new segments.
2540 */
2541 private fun moveSegmentBufferEndToSpecifiedOrLast(id: Long, startFrom: ChannelSegment<E>) {
2542 // Start searching the required segment from the specified one.
2543 var segment: ChannelSegment<E> = startFrom
2544 while (segment.id < id) {
2545 segment = segment.next ?: break
2546 }
2547 // Skip all removed segments and try to update `bufferEndSegment`
2548 // to the first non-removed one. This part should succeed eventually,
2549 // as the tail segment is never removed.
2550 while (true) {
2551 while (segment.isRemoved) {
2552 segment = segment.next ?: break
2553 }
2554 // Try to update `bufferEndSegment`. On failure,
2555 // the found segment is already removed, so it
2556 // should be skipped.
2557 if (bufferEndSegment.moveForward(segment)) return
2558 }
2559 }
2560
2561 /**
2562 * Updates the `senders` counter if its value
2563 * is lower that the specified one.
2564 *
2565 * Senders use this function to efficiently skip
2566 * a sequence of cancelled receivers.
2567 */
2568 private fun updateSendersCounterIfLower(value: Long): Unit =
2569 sendersAndCloseStatus.loop { cur ->
2570 val curCounter = cur.sendersCounter
2571 if (curCounter >= value) return
2572 val update = constructSendersAndCloseStatus(curCounter, cur.sendersCloseStatus)
2573 if (sendersAndCloseStatus.compareAndSet(cur, update)) return
2574 }
2575
2576 /**
2577 * Updates the `receivers` counter if its value
2578 * is lower that the specified one.
2579 *
2580 * Receivers use this function to efficiently skip
2581 * a sequence of cancelled senders.
2582 */
2583 private fun updateReceiversCounterIfLower(value: Long): Unit =
2584 receivers.loop { cur ->
2585 if (cur >= value) return
2586 if (receivers.compareAndSet(cur, value)) return
2587 }
2588
2589 // ###################
2590 // # Debug Functions #
2591 // ###################
2592
2593 @Suppress("ConvertTwoComparisonsToRangeCheck")
2594 override fun toString(): String {
2595 val sb = StringBuilder()
2596 // Append the close status
2597 when (sendersAndCloseStatus.value.sendersCloseStatus) {
2598 CLOSE_STATUS_CLOSED -> sb.append("closed,")
2599 CLOSE_STATUS_CANCELLED -> sb.append("cancelled,")
2600 }
2601 // Append the buffer capacity
2602 sb.append("capacity=$capacity,")
2603 // Append the data
2604 sb.append("data=[")
2605 val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2606 .filter { it !== NULL_SEGMENT }
2607 .minBy { it.id }
2608 val r = receiversCounter
2609 val s = sendersCounter
2610 var segment = firstSegment
2611 append_elements@ while (true) {
2612 process_cell@ for (i in 0 until SEGMENT_SIZE) {
2613 val globalCellIndex = segment.id * SEGMENT_SIZE + i
2614 if (globalCellIndex >= s && globalCellIndex >= r) break@append_elements
2615 val cellState = segment.getState(i)
2616 val element = segment.getElement(i)
2617 val cellStateString = when (cellState) {
2618 is CancellableContinuation<*> -> {
2619 when {
2620 globalCellIndex < r && globalCellIndex >= s -> "receive"
2621 globalCellIndex < s && globalCellIndex >= r -> "send"
2622 else -> "cont"
2623 }
2624 }
2625 is SelectInstance<*> -> {
2626 when {
2627 globalCellIndex < r && globalCellIndex >= s -> "onReceive"
2628 globalCellIndex < s && globalCellIndex >= r -> "onSend"
2629 else -> "select"
2630 }
2631 }
2632 is ReceiveCatching<*> -> "receiveCatching"
2633 is SendBroadcast -> "sendBroadcast"
2634 is WaiterEB -> "EB($cellState)"
2635 RESUMING_BY_RCV, RESUMING_BY_EB -> "resuming_sender"
2636 null, IN_BUFFER, DONE_RCV, POISONED, INTERRUPTED_RCV, INTERRUPTED_SEND, CHANNEL_CLOSED -> continue@process_cell
2637 else -> cellState.toString() // leave it just in case something is missed.
2638 }
2639 if (element != null) {
2640 sb.append("($cellStateString,$element),")
2641 } else {
2642 sb.append("$cellStateString,")
2643 }
2644 }
2645 // Process the next segment if exists.
2646 segment = segment.next ?: break
2647 }
2648 if (sb.last() == ',') sb.deleteAt(sb.length - 1)
2649 sb.append("]")
2650 // The string representation is constructed.
2651 return sb.toString()
2652 }
2653
2654 // Returns a debug representation of this channel,
2655 // which is actively used in Lincheck tests.
2656 internal fun toStringDebug(): String {
2657 val sb = StringBuilder()
2658 // Append the counter values and the close status
2659 sb.append("S=${sendersCounter},R=${receiversCounter},B=${bufferEndCounter},B'=${completedExpandBuffersAndPauseFlag.value},C=${sendersAndCloseStatus.value.sendersCloseStatus},")
2660 when (sendersAndCloseStatus.value.sendersCloseStatus) {
2661 CLOSE_STATUS_CANCELLATION_STARTED -> sb.append("CANCELLATION_STARTED,")
2662 CLOSE_STATUS_CLOSED -> sb.append("CLOSED,")
2663 CLOSE_STATUS_CANCELLED -> sb.append("CANCELLED,")
2664 }
2665 // Append the segment references
2666 sb.append("SEND_SEGM=${sendSegment.value.hexAddress},RCV_SEGM=${receiveSegment.value.hexAddress}")
2667 if (!isRendezvousOrUnlimited) sb.append(",EB_SEGM=${bufferEndSegment.value.hexAddress}")
2668 sb.append(" ") // add some space
2669 // Append the linked list of segments.
2670 val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2671 .filter { it !== NULL_SEGMENT }
2672 .minBy { it.id }
2673 var segment = firstSegment
2674 while (true) {
2675 sb.append("${segment.hexAddress}=[${if (segment.isRemoved) "*" else ""}${segment.id},prev=${segment.prev?.hexAddress},")
2676 repeat(SEGMENT_SIZE) { i ->
2677 val cellState = segment.getState(i)
2678 val element = segment.getElement(i)
2679 val cellStateString = when (cellState) {
2680 is CancellableContinuation<*> -> "cont"
2681 is SelectInstance<*> -> "select"
2682 is ReceiveCatching<*> -> "receiveCatching"
2683 is SendBroadcast -> "send(broadcast)"
2684 is WaiterEB -> "EB($cellState)"
2685 else -> cellState.toString()
2686 }
2687 sb.append("[$i]=($cellStateString,$element),")
2688 }
2689 sb.append("next=${segment.next?.hexAddress}] ")
2690 // Process the next segment if exists.
2691 segment = segment.next ?: break
2692 }
2693 // The string representation of this channel is now constructed!
2694 return sb.toString()
2695 }
2696
2697
2698 // This is an internal methods for tests.
2699 fun checkSegmentStructureInvariants() {
2700 if (isRendezvousOrUnlimited) {
2701 check(bufferEndSegment.value === NULL_SEGMENT) {
2702 "bufferEndSegment must be NULL_SEGMENT for rendezvous and unlimited channels; they do not manipulate it.\n" +
2703 "Channel state: $this"
2704 }
2705 } else {
2706 check(receiveSegment.value.id <= bufferEndSegment.value.id) {
2707 "bufferEndSegment should not have lower id than receiveSegment.\n" +
2708 "Channel state: $this"
2709 }
2710 }
2711 val firstSegment = listOf(receiveSegment.value, sendSegment.value, bufferEndSegment.value)
2712 .filter { it !== NULL_SEGMENT }
2713 .minBy { it.id }
2714 check(firstSegment.prev == null) {
2715 "All processed segments should be unreachable from the data structure, but the `prev` link of the leftmost segment is non-null.\n" +
2716 "Channel state: $this"
2717 }
2718 // Check that the doubly-linked list of segments does not
2719 // contain full-of-cancelled-cells segments.
2720 var segment = firstSegment
2721 while (segment.next != null) {
2722 // Note that the `prev` reference can be `null` if this channel is closed.
2723 check(segment.next!!.prev == null || segment.next!!.prev === segment) {
2724 "The `segment.next.prev === segment` invariant is violated.\n" +
2725 "Channel state: $this"
2726 }
2727 // Count the number of closed/interrupted cells
2728 // and check that all cells are in expected states.
2729 var interruptedOrClosedCells = 0
2730 for (i in 0 until SEGMENT_SIZE) {
2731 when (val state = segment.getState(i)) {
2732 BUFFERED -> {} // The cell stores a buffered element.
2733 is Waiter -> {} // The cell stores a suspended request.
2734 INTERRUPTED_RCV, INTERRUPTED_SEND, CHANNEL_CLOSED -> {
2735 // The cell stored an interrupted request or indicates
2736 // that this channel is already closed.
2737 // Check that the element slot is cleaned and increment
2738 // the number of cells in closed/interrupted state.
2739 check(segment.getElement(i) == null)
2740 interruptedOrClosedCells++
2741 }
2742 POISONED, DONE_RCV -> {
2743 // The cell is successfully processed or poisoned.
2744 // Check that the element slot is cleaned.
2745 check(segment.getElement(i) == null)
2746 }
2747 // Other states are illegal after all running operations finish.
2748 else -> error("Unexpected segment cell state: $state.\nChannel state: $this")
2749 }
2750 }
2751 // Is this segment full of cancelled/closed cells?
2752 // If so, this segment should be removed from the
2753 // linked list if nether `receiveSegment`, nor
2754 // `sendSegment`, nor `bufferEndSegment` reference it.
2755 if (interruptedOrClosedCells == SEGMENT_SIZE) {
2756 check(segment === receiveSegment.value || segment === sendSegment.value || segment === bufferEndSegment.value) {
2757 "Logically removed segment is reachable.\nChannel state: $this"
2758 }
2759 }
2760 // Process the next segment.
2761 segment = segment.next!!
2762 }
2763 }
2764 }
2765
2766 /**
2767 * The channel is represented as a list of segments, which simulates an infinite array.
2768 * Each segment has its own [id], which increase from the beginning. These [id]s help
2769 * to update [BufferedChannel.sendSegment], [BufferedChannel.receiveSegment],
2770 * and [BufferedChannel.bufferEndSegment] correctly.
2771 */
2772 internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: BufferedChannel<E>?, pointers: Int) : Segment<ChannelSegment<E>>(id, prev, pointers) {
2773 private val _channel: BufferedChannel<E>? = channel
2774 val channel get() = _channel!! // always non-null except for `NULL_SEGMENT`
2775
2776 private val data = atomicArrayOfNulls<Any?>(SEGMENT_SIZE * 2) // 2 registers per slot: state + element
2777 override val numberOfSlots: Int get() = SEGMENT_SIZE
2778
2779 // ########################################
2780 // # Manipulation with the Element Fields #
2781 // ########################################
2782
storeElementnull2783 internal fun storeElement(index: Int, element: E) {
2784 setElementLazy(index, element)
2785 }
2786
2787 @Suppress("UNCHECKED_CAST")
getElementnull2788 internal fun getElement(index: Int) = data[index * 2].value as E
2789
2790 internal fun retrieveElement(index: Int): E = getElement(index).also { cleanElement(index) }
2791
cleanElementnull2792 internal fun cleanElement(index: Int) {
2793 setElementLazy(index, null)
2794 }
2795
setElementLazynull2796 private fun setElementLazy(index: Int, value: Any?) {
2797 data[index * 2].lazySet(value)
2798 }
2799
2800 // ######################################
2801 // # Manipulation with the State Fields #
2802 // ######################################
2803
getStatenull2804 internal fun getState(index: Int): Any? = data[index * 2 + 1].value
2805
2806 internal fun setState(index: Int, value: Any?) {
2807 data[index * 2 + 1].value = value
2808 }
2809
casStatenull2810 internal fun casState(index: Int, from: Any?, to: Any?) = data[index * 2 + 1].compareAndSet(from, to)
2811
2812 internal fun getAndSetState(index: Int, update: Any?) = data[index * 2 + 1].getAndSet(update)
2813
2814
2815 // ########################
2816 // # Cancellation Support #
2817 // ########################
2818
2819 override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
2820 // To distinguish cancelled senders and receivers, senders equip the index value with
2821 // an additional marker, adding `SEGMENT_SIZE` to the value.
2822 val isSender = index >= SEGMENT_SIZE
2823 // Unwrap the index.
2824 @Suppress("NAME_SHADOWING") val index = if (isSender) index - SEGMENT_SIZE else index
2825 // Read the element, which may be needed further to call `onUndeliveredElement`.
2826 val element = getElement(index)
2827 // Update the cell state.
2828 while (true) {
2829 // CAS-loop
2830 // Read the current state of the cell.
2831 val cur = getState(index)
2832 when {
2833 // The cell stores a waiter.
2834 cur is Waiter || cur is WaiterEB -> {
2835 // The cancelled request is either send or receive.
2836 // Update the cell state correspondingly.
2837 val update = if (isSender) INTERRUPTED_SEND else INTERRUPTED_RCV
2838 if (casState(index, cur, update)) {
2839 // The waiter has been successfully cancelled.
2840 // Clean the element slot and invoke `onSlotCleaned()`,
2841 // which may cause deleting the whole segment from the linked list.
2842 // In case the cancelled request is receiver, it is critical to ensure
2843 // that the `expandBuffer()` attempt that processes this cell is completed,
2844 // so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`.
2845 cleanElement(index)
2846 onCancelledRequest(index, !isSender)
2847 // Call `onUndeliveredElement` if needed.
2848 if (isSender) {
2849 channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2850 }
2851 return
2852 }
2853 }
2854 // The cell already indicates that the operation is cancelled.
2855 cur === INTERRUPTED_SEND || cur === INTERRUPTED_RCV -> {
2856 // Clean the element slot to avoid memory leaks,
2857 // invoke `onUndeliveredElement` if needed, and finish
2858 cleanElement(index)
2859 // Call `onUndeliveredElement` if needed.
2860 if (isSender) {
2861 channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2862 }
2863 return
2864 }
2865 // An opposite operation is resuming this request;
2866 // wait until the cell state updates.
2867 // It is possible that an opposite operation has already
2868 // resumed this request, which will result in updating
2869 // the cell state to `DONE_RCV` or `BUFFERED`, while the
2870 // current cancellation is caused by prompt cancellation.
2871 cur === RESUMING_BY_EB || cur === RESUMING_BY_RCV -> continue
2872 // This request was successfully resumed, so this cancellation
2873 // is caused by the prompt cancellation feature and should be ignored.
2874 cur === DONE_RCV || cur === BUFFERED -> return
2875 // The cell state indicates that the channel is closed;
2876 // this cancellation should be ignored.
2877 cur === CHANNEL_CLOSED -> return
2878 else -> error("unexpected state: $cur")
2879 }
2880 }
2881 }
2882
2883 /**
2884 * Invokes `onSlotCleaned()` preceded by a `waitExpandBufferCompletion(..)` call
2885 * in case the cancelled request is receiver.
2886 */
onCancelledRequestnull2887 fun onCancelledRequest(index: Int, receiver: Boolean) {
2888 if (receiver) channel.waitExpandBufferCompletion(id * SEGMENT_SIZE + index)
2889 onSlotCleaned()
2890 }
2891 }
2892
2893 // WA for atomicfu + JVM_IR compiler bug that lead to SMAP-related compiler crashes: KT-55983
createSegmentFunctionnull2894 internal fun <E> createSegmentFunction(): KFunction2<Long, ChannelSegment<E>, ChannelSegment<E>> = ::createSegment
2895
2896 private fun <E> createSegment(id: Long, prev: ChannelSegment<E>) = ChannelSegment(
2897 id = id,
2898 prev = prev,
2899 channel = prev.channel,
2900 pointers = 0
2901 )
2902 private val NULL_SEGMENT = ChannelSegment<Any?>(id = -1, prev = null, channel = null, pointers = 0)
2903
2904 /**
2905 * Number of cells in each segment.
2906 */
2907 @JvmField
2908 internal val SEGMENT_SIZE = systemProp("kotlinx.coroutines.bufferedChannel.segmentSize", 32)
2909
2910 /**
2911 * Number of iterations to wait in [BufferedChannel.waitExpandBufferCompletion] until the numbers of started and completed
2912 * [BufferedChannel.expandBuffer] calls coincide. When the limit is reached, [BufferedChannel.waitExpandBufferCompletion]
2913 * blocks further [BufferedChannel.expandBuffer]-s to avoid starvation.
2914 */
2915 private val EXPAND_BUFFER_COMPLETION_WAIT_ITERATIONS = systemProp("kotlinx.coroutines.bufferedChannel.expandBufferCompletionWaitIterations", 10_000)
2916
2917 /**
2918 * Tries to resume this continuation with the specified
2919 * value. Returns `true` on success and `false` on failure.
2920 */
2921 private fun <T> CancellableContinuation<T>.tryResume0(
2922 value: T,
2923 onCancellation: ((cause: Throwable) -> Unit)? = null
2924 ): Boolean =
2925 tryResume(value, null, onCancellation).let { token ->
2926 if (token != null) {
2927 completeResume(token)
2928 true
2929 } else false
2930 }
2931
2932 /*
2933 If the channel is rendezvous or unlimited, the `bufferEnd` counter
2934 should be initialized with the corresponding value below and never change.
2935 In this case, the `expandBuffer(..)` operation does nothing.
2936 */
2937 private const val BUFFER_END_RENDEZVOUS = 0L // no buffer
2938 private const val BUFFER_END_UNLIMITED = Long.MAX_VALUE // infinite buffer
initialBufferEndnull2939 private fun initialBufferEnd(capacity: Int): Long = when (capacity) {
2940 Channel.RENDEZVOUS -> BUFFER_END_RENDEZVOUS
2941 Channel.UNLIMITED -> BUFFER_END_UNLIMITED
2942 else -> capacity.toLong()
2943 }
2944
2945 /*
2946 Cell states. The initial "empty" state is represented with `null`,
2947 and suspended operations are represented with [Waiter] instances.
2948 */
2949
2950 // The cell stores a buffered element.
2951 @JvmField
2952 internal val BUFFERED = Symbol("BUFFERED")
2953 // Concurrent `expandBuffer(..)` can inform the
2954 // upcoming sender that it should buffer the element.
2955 private val IN_BUFFER = Symbol("SHOULD_BUFFER")
2956 // Indicates that a receiver (RCV suffix) is resuming
2957 // the suspended sender; after that, it should update
2958 // the state to either `DONE_RCV` (on success) or
2959 // `INTERRUPTED_SEND` (on failure).
2960 private val RESUMING_BY_RCV = Symbol("S_RESUMING_BY_RCV")
2961 // Indicates that `expandBuffer(..)` (RCV suffix) is resuming
2962 // the suspended sender; after that, it should update
2963 // the state to either `BUFFERED` (on success) or
2964 // `INTERRUPTED_SEND` (on failure).
2965 private val RESUMING_BY_EB = Symbol("RESUMING_BY_EB")
2966 // When a receiver comes to the cell already covered by
2967 // a sender (according to the counters), but the cell
2968 // is still in `EMPTY` or `IN_BUFFER` state, it breaks
2969 // the cell by changing its state to `POISONED`.
2970 private val POISONED = Symbol("POISONED")
2971 // When the element is successfully transferred
2972 // to a receiver, the cell changes to `DONE_RCV`.
2973 private val DONE_RCV = Symbol("DONE_RCV")
2974 // Cancelled sender.
2975 private val INTERRUPTED_SEND = Symbol("INTERRUPTED_SEND")
2976 // Cancelled receiver.
2977 private val INTERRUPTED_RCV = Symbol("INTERRUPTED_RCV")
2978 // Indicates that the channel is closed.
2979 internal val CHANNEL_CLOSED = Symbol("CHANNEL_CLOSED")
2980 // When the cell is already covered by both sender and
2981 // receiver (`sender` and `receivers` counters are greater
2982 // than the cell number), the `expandBuffer(..)` procedure
2983 // cannot distinguish which kind of operation is stored
2984 // in the cell. Thus, it wraps the waiter with this descriptor,
2985 // informing the possibly upcoming receiver that it should
2986 // complete the `expandBuffer(..)` procedure if the waiter stored
2987 // in the cell is sender. In turn, senders ignore this information.
2988 private class WaiterEB(@JvmField val waiter: Waiter) {
toStringnull2989 override fun toString() = "WaiterEB($waiter)"
2990 }
2991
2992
2993
2994 /**
2995 * To distinguish suspended [BufferedChannel.receive] and
2996 * [BufferedChannel.receiveCatching] operations, the latter
2997 * uses this wrapper for its continuation.
2998 */
2999 private class ReceiveCatching<E>(
3000 @JvmField val cont: CancellableContinuationImpl<ChannelResult<E>>
3001 ) : Waiter by cont
3002
3003 /*
3004 Internal results for [BufferedChannel.updateCellReceive].
3005 On successful rendezvous with waiting sender or
3006 buffered element retrieval, the corresponding element
3007 is returned as result of [BufferedChannel.updateCellReceive].
3008 */
3009 private val SUSPEND = Symbol("SUSPEND")
3010 private val SUSPEND_NO_WAITER = Symbol("SUSPEND_NO_WAITER")
3011 private val FAILED = Symbol("FAILED")
3012
3013 /*
3014 Internal results for [BufferedChannel.updateCellSend]
3015 */
3016 private const val RESULT_RENDEZVOUS = 0
3017 private const val RESULT_BUFFERED = 1
3018 private const val RESULT_SUSPEND = 2
3019 private const val RESULT_SUSPEND_NO_WAITER = 3
3020 private const val RESULT_CLOSED = 4
3021 private const val RESULT_FAILED = 5
3022
3023 /**
3024 * Special value for [BufferedChannel.BufferedChannelIterator.receiveResult]
3025 * that indicates the absence of pre-received result.
3026 */
3027 private val NO_RECEIVE_RESULT = Symbol("NO_RECEIVE_RESULT")
3028
3029 /*
3030 As [BufferedChannel.invokeOnClose] can be invoked concurrently
3031 with channel closing, we have to synchronize them. These two
3032 markers help with the synchronization.
3033 */
3034 private val CLOSE_HANDLER_CLOSED = Symbol("CLOSE_HANDLER_CLOSED")
3035 private val CLOSE_HANDLER_INVOKED = Symbol("CLOSE_HANDLER_INVOKED")
3036
3037 /**
3038 * Specifies the absence of closing cause, stored in [BufferedChannel._closeCause].
3039 * When the channel is closed or cancelled without exception, this [NO_CLOSE_CAUSE]
3040 * marker should be replaced with `null`.
3041 */
3042 private val NO_CLOSE_CAUSE = Symbol("NO_CLOSE_CAUSE")
3043
3044 /*
3045 The channel close statuses. The transition scheme is the following:
3046 +--------+ +----------------------+ +-----------+
3047 | ACTIVE |-->| CANCELLATION_STARTED |-->| CANCELLED |
3048 +--------+ +----------------------+ +-----------+
3049 | ^
3050 | +--------+ |
3051 +------------>| CLOSED |------------------+
3052 +--------+
3053 We need `CANCELLATION_STARTED` to synchronize
3054 concurrent closing and cancellation.
3055 */
3056 private const val CLOSE_STATUS_ACTIVE = 0
3057 private const val CLOSE_STATUS_CANCELLATION_STARTED = 1
3058 private const val CLOSE_STATUS_CLOSED = 2
3059 private const val CLOSE_STATUS_CANCELLED = 3
3060
3061 /*
3062 The `senders` counter and the channel close status
3063 are stored in a single 64-bit register to save the space
3064 and reduce the number of reads in sending operations.
3065 The code below encapsulates the required bit arithmetics.
3066 */
3067 private const val SENDERS_CLOSE_STATUS_SHIFT = 60
3068 private const val SENDERS_COUNTER_MASK = (1L shl SENDERS_CLOSE_STATUS_SHIFT) - 1
3069 private inline val Long.sendersCounter get() = this and SENDERS_COUNTER_MASK
3070 private inline val Long.sendersCloseStatus: Int get() = (this shr SENDERS_CLOSE_STATUS_SHIFT).toInt()
3071 private fun constructSendersAndCloseStatus(counter: Long, closeStatus: Int): Long =
3072 (closeStatus.toLong() shl SENDERS_CLOSE_STATUS_SHIFT) + counter
3073
3074 /*
3075 The `completedExpandBuffersAndPauseFlag` 64-bit counter contains
3076 the number of completed `expandBuffer()` attempts along with a special
3077 flag that pauses progress to avoid starvation in `waitExpandBufferCompletion(..)`.
3078 The code below encapsulates the required bit arithmetics.
3079 */
3080 private const val EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT = 1L shl 62
3081 private const val EB_COMPLETED_COUNTER_MASK = EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT - 1
3082 private inline val Long.ebCompletedCounter get() = this and EB_COMPLETED_COUNTER_MASK
3083 private inline val Long.ebPauseExpandBuffers: Boolean get() = (this and EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT) != 0L
3084 private fun constructEBCompletedAndPauseFlag(counter: Long, pauseEB: Boolean): Long =
3085 (if (pauseEB) EB_COMPLETED_PAUSE_EXPAND_BUFFERS_BIT else 0) + counter
3086