xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/jvm/src/channels/Actor.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)
1 package kotlinx.coroutines.channels
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.intrinsics.*
5 import kotlinx.coroutines.selects.*
6 import kotlin.coroutines.*
7 import kotlin.coroutines.intrinsics.*
8 
9 /**
10  * Scope for [actor][GlobalScope.actor] coroutine builder.
11  *
12  * **Note: This API will become obsolete in future updates with introduction of complex actors.**
13  *           See [issue #87](https://github.com/Kotlin/kotlinx.coroutines/issues/87).
14  */
15 @ObsoleteCoroutinesApi
16 public interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {
17     /**
18      * A reference to the mailbox channel that this coroutine [receives][receive] messages from.
19      * It is provided for convenience, so that the code in the coroutine can refer
20      * to the channel as `channel` as apposed to `this`.
21      * All the [ReceiveChannel] functions on this interface delegate to
22      * the channel instance returned by this function.
23      */
24     public val channel: Channel<E>
25 }
26 
27 /**
28  * Launches new coroutine that is receiving messages from its mailbox channel
29  * and returns a reference to its mailbox channel as a [SendChannel]. The resulting
30  * object can be used to [send][SendChannel.send] messages to this coroutine.
31  *
32  * The scope of the coroutine contains [ActorScope] interface, which implements
33  * both [CoroutineScope] and [ReceiveChannel], so that coroutine can invoke
34  * [receive][ReceiveChannel.receive] directly. The channel is [closed][SendChannel.close]
35  * when the coroutine completes.
36  *
37  * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
38  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
39  * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
40  * with corresponding [context] element.
41  *
42  * By default, the coroutine is immediately scheduled for execution.
43  * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
44  * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
45  * it will be started implicitly on the first message
46  * [sent][SendChannel.send] to this actors's mailbox channel.
47  *
48  * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
49  * the resulting channel becomes _failed_, so that any attempt to send to such a channel throws exception.
50  *
51  * The kind of the resulting channel depends on the specified [capacity] parameter.
52  * See [Channel] interface documentation for details.
53  *
54  * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
55  *
56  * ### Using actors
57  *
58  * A typical usage of the actor builder looks like this:
59  *
60  * ```
61  * val c = actor {
62  *     // initialize actor's state
63  *     for (msg in channel) {
64  *         // process message here
65  *     }
66  * }
67  * // send messages to the actor
68  * c.send(...)
69  * ...
70  * // stop the actor when it is no longer needed
71  * c.close()
72  * ```
73  *
74  * ### Stopping and cancelling actors
75  *
76  * When the inbox channel of the actor is [closed][SendChannel.close] it sends a special "close token" to the actor.
77  * The actor still processes all the messages that were already sent and then "`for (msg in channel)`" loop terminates
78  * and the actor completes.
79  *
80  * If the actor needs to be aborted without processing all the messages that were already sent to it, then
81  * it shall be created with a parent job:
82  *
83  * ```
84  * val job = Job()
85  * val c = actor(context = job) {  ... }
86  * ...
87  * // abort the actor
88  * job.cancel()
89  * ```
90  *
91  * When actor's parent job is [cancelled][Job.cancel], then actor's job becomes cancelled. It means that
92  * "`for (msg in channel)`" and other cancellable suspending functions throw [CancellationException] and actor
93  * completes without processing remaining messages.
94  *
95  * **Note: This API will become obsolete in future updates with introduction of complex actors.**
96  *           See [issue #87](https://github.com/Kotlin/kotlinx.coroutines/issues/87).
97  *
98  * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
99  * @param capacity capacity of the channel's buffer (no buffer by default).
100  * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
101  * @param onCompletion optional completion handler for the actor coroutine (see [Job.invokeOnCompletion])
102  * @param block the coroutine code.
103  */
104 @ObsoleteCoroutinesApi
actornull105 public fun <E> CoroutineScope.actor(
106     context: CoroutineContext = EmptyCoroutineContext,
107     capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?
108     start: CoroutineStart = CoroutineStart.DEFAULT,
109     onCompletion: CompletionHandler? = null,
110     block: suspend ActorScope<E>.() -> Unit
111 ): SendChannel<E> {
112     val newContext = newCoroutineContext(context)
113     val channel = Channel<E>(capacity)
114     val coroutine = if (start.isLazy)
115         LazyActorCoroutine(newContext, channel, block) else
116         ActorCoroutine(newContext, channel, active = true)
117     if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
118     coroutine.start(start, coroutine, block)
119     return coroutine
120 }
121 
122 private open class ActorCoroutine<E>(
123     parentContext: CoroutineContext,
124     channel: Channel<E>,
125     active: Boolean
126 ) : ChannelCoroutine<E>(parentContext, channel, initParentJob = false, active = active), ActorScope<E> {
127 
128     init {
129         initParentJob(parentContext[Job])
130     }
131 
onCancellingnull132     override fun onCancelling(cause: Throwable?) {
133         _channel.cancel(cause?.let {
134             it as? CancellationException ?: CancellationException("$classSimpleName was cancelled", it)
135         })
136     }
137 
handleJobExceptionnull138     override fun handleJobException(exception: Throwable): Boolean {
139         handleCoroutineException(context, exception)
140         return true
141     }
142 }
143 
144 private class LazyActorCoroutine<E>(
145     parentContext: CoroutineContext,
146     channel: Channel<E>,
147     block: suspend ActorScope<E>.() -> Unit
148 ) : ActorCoroutine<E>(parentContext, channel, active = false) {
149 
150     private var continuation = block.createCoroutineUnintercepted(this, this)
151 
onStartnull152     override fun onStart() {
153         continuation.startCoroutineCancellable(this)
154     }
155 
sendnull156     override suspend fun send(element: E) {
157         start()
158         return super.send(element)
159     }
160 
161     @Suppress("DEPRECATION_ERROR")
162     @Deprecated(
163         level = DeprecationLevel.ERROR,
164         message = "Deprecated in the favour of 'trySend' method",
165         replaceWith = ReplaceWith("trySend(element).isSuccess")
166     ) // See super()
offernull167     override fun offer(element: E): Boolean {
168         start()
169         return super.offer(element)
170     }
171 
trySendnull172     override fun trySend(element: E): ChannelResult<Unit> {
173         start()
174         return super.trySend(element)
175     }
176 
177     @Suppress("MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_DEPRECATION_WARNING") // do not remove the MULTIPLE_DEFAULTS suppression: required in K2
closenull178     override fun close(cause: Throwable?): Boolean {
179         // close the channel _first_
180         val closed = super.close(cause)
181         // then start the coroutine (it will promptly fail if it was not started yet)
182         start()
183         return closed
184     }
185 
186     @Suppress("UNCHECKED_CAST")
187     override val onSend: SelectClause2<E, SendChannel<E>> get() = SelectClause2Impl(
188         clauseObject = this,
189         regFunc = LazyActorCoroutine<*>::onSendRegFunction as RegistrationFunction,
190         processResFunc = super.onSend.processResFunc
191     )
192 
onSendRegFunctionnull193     private fun onSendRegFunction(select: SelectInstance<*>, element: Any?) {
194         onStart()
195         super.onSend.regFunc(this, select, element)
196     }
197 }
198