1 package kotlinx.coroutines.channels 2 3 import kotlinx.coroutines.* 4 import kotlin.coroutines.* 5 import kotlin.coroutines.intrinsics.* 6 7 public abstract class SimpleChannel { 8 companion object { 9 const val NULL_SURROGATE: Int = -1 10 } 11 12 @JvmField 13 protected var producer: Continuation<Unit>? = null 14 @JvmField 15 protected var enqueuedValue: Int = NULL_SURROGATE 16 @JvmField 17 protected var consumer: Continuation<Int>? = null 18 sendnull19 suspend fun send(element: Int) { 20 require(element != NULL_SURROGATE) 21 if (offer(element)) { 22 return 23 } 24 25 return suspendSend(element) 26 } 27 offernull28 private fun offer(element: Int): Boolean { 29 if (consumer == null) { 30 return false 31 } 32 33 consumer!!.resume(element) 34 consumer = null 35 return true 36 } 37 receivenull38 suspend fun receive(): Int { 39 // Cached value 40 if (enqueuedValue != NULL_SURROGATE) { 41 val result = enqueuedValue 42 enqueuedValue = NULL_SURROGATE 43 producer!!.resume(Unit) 44 return result 45 } 46 47 return suspendReceive() 48 } 49 suspendReceivenull50 abstract suspend fun suspendReceive(): Int 51 abstract suspend fun suspendSend(element: Int) 52 } 53 54 class NonCancellableChannel : SimpleChannel() { 55 override suspend fun suspendReceive(): Int = suspendCoroutineUninterceptedOrReturn { 56 consumer = it.intercepted() 57 COROUTINE_SUSPENDED 58 } 59 60 override suspend fun suspendSend(element: Int) = suspendCoroutineUninterceptedOrReturn<Unit> { 61 enqueuedValue = element 62 producer = it.intercepted() 63 COROUTINE_SUSPENDED 64 } 65 } 66 67 class CancellableChannel : SimpleChannel() { <lambda>null68 override suspend fun suspendReceive(): Int = suspendCancellableCoroutine { 69 consumer = it.intercepted() 70 COROUTINE_SUSPENDED 71 } 72 <lambda>null73 override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine<Unit> { 74 enqueuedValue = element 75 producer = it.intercepted() 76 COROUTINE_SUSPENDED 77 } 78 } 79 80 class CancellableReusableChannel : SimpleChannel() { <lambda>null81 override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable { 82 consumer = it.intercepted() 83 COROUTINE_SUSPENDED 84 } 85 <lambda>null86 override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable<Unit> { 87 enqueuedValue = element 88 producer = it.intercepted() 89 COROUTINE_SUSPENDED 90 } 91 } 92