1 package kotlinx.coroutines.channels
2
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.intrinsics.*
5 import kotlinx.coroutines.selects.*
6 import kotlin.coroutines.*
7 import kotlin.coroutines.intrinsics.*
8
9 /**
10 * Scope for [actor][GlobalScope.actor] coroutine builder.
11 *
12 * **Note: This API will become obsolete in future updates with introduction of complex actors.**
13 * See [issue #87](https://github.com/Kotlin/kotlinx.coroutines/issues/87).
14 */
15 @ObsoleteCoroutinesApi
16 public interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {
17 /**
18 * A reference to the mailbox channel that this coroutine [receives][receive] messages from.
19 * It is provided for convenience, so that the code in the coroutine can refer
20 * to the channel as `channel` as apposed to `this`.
21 * All the [ReceiveChannel] functions on this interface delegate to
22 * the channel instance returned by this function.
23 */
24 public val channel: Channel<E>
25 }
26
27 /**
28 * Launches new coroutine that is receiving messages from its mailbox channel
29 * and returns a reference to its mailbox channel as a [SendChannel]. The resulting
30 * object can be used to [send][SendChannel.send] messages to this coroutine.
31 *
32 * The scope of the coroutine contains [ActorScope] interface, which implements
33 * both [CoroutineScope] and [ReceiveChannel], so that coroutine can invoke
34 * [receive][ReceiveChannel.receive] directly. The channel is [closed][SendChannel.close]
35 * when the coroutine completes.
36 *
37 * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
38 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
39 * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
40 * with corresponding [context] element.
41 *
42 * By default, the coroutine is immediately scheduled for execution.
43 * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
44 * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
45 * it will be started implicitly on the first message
46 * [sent][SendChannel.send] to this actors's mailbox channel.
47 *
48 * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
49 * the resulting channel becomes _failed_, so that any attempt to send to such a channel throws exception.
50 *
51 * The kind of the resulting channel depends on the specified [capacity] parameter.
52 * See [Channel] interface documentation for details.
53 *
54 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
55 *
56 * ### Using actors
57 *
58 * A typical usage of the actor builder looks like this:
59 *
60 * ```
61 * val c = actor {
62 * // initialize actor's state
63 * for (msg in channel) {
64 * // process message here
65 * }
66 * }
67 * // send messages to the actor
68 * c.send(...)
69 * ...
70 * // stop the actor when it is no longer needed
71 * c.close()
72 * ```
73 *
74 * ### Stopping and cancelling actors
75 *
76 * When the inbox channel of the actor is [closed][SendChannel.close] it sends a special "close token" to the actor.
77 * The actor still processes all the messages that were already sent and then "`for (msg in channel)`" loop terminates
78 * and the actor completes.
79 *
80 * If the actor needs to be aborted without processing all the messages that were already sent to it, then
81 * it shall be created with a parent job:
82 *
83 * ```
84 * val job = Job()
85 * val c = actor(context = job) { ... }
86 * ...
87 * // abort the actor
88 * job.cancel()
89 * ```
90 *
91 * When actor's parent job is [cancelled][Job.cancel], then actor's job becomes cancelled. It means that
92 * "`for (msg in channel)`" and other cancellable suspending functions throw [CancellationException] and actor
93 * completes without processing remaining messages.
94 *
95 * **Note: This API will become obsolete in future updates with introduction of complex actors.**
96 * See [issue #87](https://github.com/Kotlin/kotlinx.coroutines/issues/87).
97 *
98 * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
99 * @param capacity capacity of the channel's buffer (no buffer by default).
100 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
101 * @param onCompletion optional completion handler for the actor coroutine (see [Job.invokeOnCompletion])
102 * @param block the coroutine code.
103 */
104 @ObsoleteCoroutinesApi
actornull105 public fun <E> CoroutineScope.actor(
106 context: CoroutineContext = EmptyCoroutineContext,
107 capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?
108 start: CoroutineStart = CoroutineStart.DEFAULT,
109 onCompletion: CompletionHandler? = null,
110 block: suspend ActorScope<E>.() -> Unit
111 ): SendChannel<E> {
112 val newContext = newCoroutineContext(context)
113 val channel = Channel<E>(capacity)
114 val coroutine = if (start.isLazy)
115 LazyActorCoroutine(newContext, channel, block) else
116 ActorCoroutine(newContext, channel, active = true)
117 if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
118 coroutine.start(start, coroutine, block)
119 return coroutine
120 }
121
122 private open class ActorCoroutine<E>(
123 parentContext: CoroutineContext,
124 channel: Channel<E>,
125 active: Boolean
126 ) : ChannelCoroutine<E>(parentContext, channel, initParentJob = false, active = active), ActorScope<E> {
127
128 init {
129 initParentJob(parentContext[Job])
130 }
131
onCancellingnull132 override fun onCancelling(cause: Throwable?) {
133 _channel.cancel(cause?.let {
134 it as? CancellationException ?: CancellationException("$classSimpleName was cancelled", it)
135 })
136 }
137
handleJobExceptionnull138 override fun handleJobException(exception: Throwable): Boolean {
139 handleCoroutineException(context, exception)
140 return true
141 }
142 }
143
144 private class LazyActorCoroutine<E>(
145 parentContext: CoroutineContext,
146 channel: Channel<E>,
147 block: suspend ActorScope<E>.() -> Unit
148 ) : ActorCoroutine<E>(parentContext, channel, active = false) {
149
150 private var continuation = block.createCoroutineUnintercepted(this, this)
151
onStartnull152 override fun onStart() {
153 continuation.startCoroutineCancellable(this)
154 }
155
sendnull156 override suspend fun send(element: E) {
157 start()
158 return super.send(element)
159 }
160
161 @Suppress("DEPRECATION_ERROR")
162 @Deprecated(
163 level = DeprecationLevel.ERROR,
164 message = "Deprecated in the favour of 'trySend' method",
165 replaceWith = ReplaceWith("trySend(element).isSuccess")
166 ) // See super()
offernull167 override fun offer(element: E): Boolean {
168 start()
169 return super.offer(element)
170 }
171
trySendnull172 override fun trySend(element: E): ChannelResult<Unit> {
173 start()
174 return super.trySend(element)
175 }
176
177 @Suppress("MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_DEPRECATION_WARNING") // do not remove the MULTIPLE_DEFAULTS suppression: required in K2
closenull178 override fun close(cause: Throwable?): Boolean {
179 // close the channel _first_
180 val closed = super.close(cause)
181 // then start the coroutine (it will promptly fail if it was not started yet)
182 start()
183 return closed
184 }
185
186 @Suppress("UNCHECKED_CAST")
187 override val onSend: SelectClause2<E, SendChannel<E>> get() = SelectClause2Impl(
188 clauseObject = this,
189 regFunc = LazyActorCoroutine<*>::onSendRegFunction as RegistrationFunction,
190 processResFunc = super.onSend.processResFunc
191 )
192
onSendRegFunctionnull193 private fun onSendRegFunction(select: SelectInstance<*>, element: Any?) {
194 onStart()
195 super.onSend.regFunc(this, select, element)
196 }
197 }
198