<lambda>null1 package kotlinx.coroutines.sync
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.internal.*
6 import kotlinx.coroutines.selects.*
7 import kotlin.contracts.*
8 import kotlin.jvm.*
9
10 /**
11 * Mutual exclusion for coroutines.
12 *
13 * Mutex has two states: _locked_ and _unlocked_.
14 * It is **non-reentrant**, that is invoking [lock] even from the same thread/coroutine that currently holds
15 * the lock still suspends the invoker.
16 *
17 * JVM API note:
18 * Memory semantic of the [Mutex] is similar to `synchronized` block on JVM:
19 * An unlock operation on a [Mutex] happens-before every subsequent successful lock on that [Mutex].
20 * Unsuccessful call to [tryLock] do not have any memory effects.
21 */
22 public interface Mutex {
23 /**
24 * Returns `true` if this mutex is locked.
25 */
26 public val isLocked: Boolean
27
28 /**
29 * Tries to lock this mutex, returning `false` if this mutex is already locked.
30 *
31 * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
32 * released at the end of your critical section, and [unlock] is never invoked before a successful
33 * lock acquisition.
34 *
35 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
36 * is already locked with the same token (same identity), this function throws [IllegalStateException].
37 */
38 public fun tryLock(owner: Any? = null): Boolean
39
40 /**
41 * Locks this mutex, suspending caller until the lock is acquired (in other words, while the lock is held elsewhere).
42 *
43 * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
44 * suspending function is waiting, this function immediately resumes with [CancellationException].
45 * There is a **prompt cancellation guarantee**: even if this function is ready to return the result, but was cancelled
46 * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
47 * This function releases the lock if it was already acquired by this function before the [CancellationException]
48 * was thrown.
49 *
50 * Note that this function does not check for cancellation when it is not suspended.
51 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
52 *
53 * Use [tryLock] to try acquiring the lock without waiting.
54 *
55 * This function is fair; suspended callers are resumed in first-in-first-out order.
56 *
57 * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
58 * released at the end of the critical section, and [unlock] is never invoked before a successful
59 * lock acquisition.
60 *
61 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
62 * is already locked with the same token (same identity), this function throws [IllegalStateException].
63 */
64 public suspend fun lock(owner: Any? = null)
65
66 /**
67 * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
68 * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
69 * the reference to this mutex is passed into the corresponding block.
70 */
71 @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " +
72 "For additional details please refer to #2794") // WARNING since 1.6.0
73 public val onLock: SelectClause2<Any?, Mutex>
74
75 /**
76 * Checks whether this mutex is locked by the specified owner.
77 *
78 * @return `true` when this mutex is locked by the specified owner;
79 * `false` if the mutex is not locked or locked by another owner.
80 */
81 public fun holdsLock(owner: Any): Boolean
82
83 /**
84 * Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked or
85 * was locked with a different owner token (by identity).
86 *
87 * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
88 * released at the end of the critical section, and [unlock] is never invoked before a successful
89 * lock acquisition.
90 *
91 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
92 * was locked with the different token (by identity), this function throws [IllegalStateException].
93 */
94 public fun unlock(owner: Any? = null)
95 }
96
97 /**
98 * Creates a [Mutex] instance.
99 * The mutex created is fair: lock is granted in first come, first served order.
100 *
101 * @param locked initial state of the mutex.
102 */
103 @Suppress("FunctionName")
Mutexnull104 public fun Mutex(locked: Boolean = false): Mutex =
105 MutexImpl(locked)
106
107 /**
108 * Executes the given [action] under this mutex's lock.
109 *
110 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
111 * is already locked with the same token (same identity), this function throws [IllegalStateException].
112 *
113 * @return the return value of the action.
114 */
115 @OptIn(ExperimentalContracts::class)
116 public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
117 contract {
118 callsInPlace(action, InvocationKind.EXACTLY_ONCE)
119 }
120 lock(owner)
121 return try {
122 action()
123 } finally {
124 unlock(owner)
125 }
126 }
127
128
129 internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1 else 0), Mutex {
130 /**
131 * After the lock is acquired, the corresponding owner is stored in this field.
132 * The [unlock] operation checks the owner and either re-sets it to [NO_OWNER],
133 * if there is no waiting request, or to the owner of the suspended [lock] operation
134 * to be resumed, otherwise.
135 */
136 private val owner = atomic<Any?>(if (locked) null else NO_OWNER)
137
138 private val onSelectCancellationUnlockConstructor: OnCancellationConstructor =
ownernull139 { _: SelectInstance<*>, owner: Any?, _: Any? ->
140 { unlock(owner) }
141 }
142
143 override val isLocked: Boolean get() =
144 availablePermits == 0
145
holdsLocknull146 override fun holdsLock(owner: Any): Boolean = holdsLockImpl(owner) == HOLDS_LOCK_YES
147
148 /**
149 * [HOLDS_LOCK_UNLOCKED] if the mutex is unlocked
150 * [HOLDS_LOCK_YES] if the mutex is held with the specified [owner]
151 * [HOLDS_LOCK_ANOTHER_OWNER] if the mutex is held with a different owner
152 */
153 private fun holdsLockImpl(owner: Any?): Int {
154 while (true) {
155 // Is this mutex locked?
156 if (!isLocked) return HOLDS_LOCK_UNLOCKED
157 val curOwner = this.owner.value
158 // Wait in a spin-loop until the owner is set
159 if (curOwner === NO_OWNER) continue // <-- ATTENTION, BLOCKING PART HERE
160 // Check the owner
161 return if (curOwner === owner) HOLDS_LOCK_YES else HOLDS_LOCK_ANOTHER_OWNER
162 }
163 }
164
locknull165 override suspend fun lock(owner: Any?) {
166 if (tryLock(owner)) return
167 lockSuspend(owner)
168 }
169
lockSuspendnull170 private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> { cont ->
171 val contWithOwner = CancellableContinuationWithOwner(cont, owner)
172 acquire(contWithOwner)
173 }
174
tryLocknull175 override fun tryLock(owner: Any?): Boolean = when (tryLockImpl(owner)) {
176 TRY_LOCK_SUCCESS -> true
177 TRY_LOCK_FAILED -> false
178 TRY_LOCK_ALREADY_LOCKED_BY_OWNER -> error("This mutex is already locked by the specified owner: $owner")
179 else -> error("unexpected")
180 }
181
tryLockImplnull182 private fun tryLockImpl(owner: Any?): Int {
183 while (true) {
184 if (tryAcquire()) {
185 assert { this.owner.value === NO_OWNER }
186 this.owner.value = owner
187 return TRY_LOCK_SUCCESS
188 } else {
189 // The semaphore permit acquisition has failed.
190 // However, we need to check that this mutex is not
191 // locked by our owner.
192 if (owner == null) return TRY_LOCK_FAILED
193 when (holdsLockImpl(owner)) {
194 // This mutex is already locked by our owner.
195 HOLDS_LOCK_YES -> return TRY_LOCK_ALREADY_LOCKED_BY_OWNER
196 // This mutex is locked by another owner, `trylock(..)` must return `false`.
197 HOLDS_LOCK_ANOTHER_OWNER -> return TRY_LOCK_FAILED
198 // This mutex is no longer locked, restart the operation.
199 HOLDS_LOCK_UNLOCKED -> continue
200 }
201 }
202 }
203 }
204
unlocknull205 override fun unlock(owner: Any?) {
206 while (true) {
207 // Is this mutex locked?
208 check(isLocked) { "This mutex is not locked" }
209 // Read the owner, waiting until it is set in a spin-loop if required.
210 val curOwner = this.owner.value
211 if (curOwner === NO_OWNER) continue // <-- ATTENTION, BLOCKING PART HERE
212 // Check the owner.
213 check(curOwner === owner || owner == null) { "This mutex is locked by $curOwner, but $owner is expected" }
214 // Try to clean the owner first. We need to use CAS here to synchronize with concurrent `unlock(..)`-s.
215 if (!this.owner.compareAndSet(curOwner, NO_OWNER)) continue
216 // Release the semaphore permit at the end.
217 release()
218 return
219 }
220 }
221
222 @Suppress("UNCHECKED_CAST", "OverridingDeprecatedMember", "OVERRIDE_DEPRECATION")
223 override val onLock: SelectClause2<Any?, Mutex> get() = SelectClause2Impl(
224 clauseObject = this,
225 regFunc = MutexImpl::onLockRegFunction as RegistrationFunction,
226 processResFunc = MutexImpl::onLockProcessResult as ProcessResultFunction,
227 onCancellationConstructor = onSelectCancellationUnlockConstructor
228 )
229
onLockRegFunctionnull230 protected open fun onLockRegFunction(select: SelectInstance<*>, owner: Any?) {
231 if (owner != null && holdsLock(owner)) {
232 select.selectInRegistrationPhase(ON_LOCK_ALREADY_LOCKED_BY_OWNER)
233 } else {
234 onAcquireRegFunction(SelectInstanceWithOwner(select as SelectInstanceInternal<*>, owner), owner)
235 }
236 }
237
onLockProcessResultnull238 protected open fun onLockProcessResult(owner: Any?, result: Any?): Any? {
239 if (result == ON_LOCK_ALREADY_LOCKED_BY_OWNER) {
240 error("This mutex is already locked by the specified owner: $owner")
241 }
242 return this
243 }
244
245 private inner class CancellableContinuationWithOwner(
246 @JvmField
247 val cont: CancellableContinuationImpl<Unit>,
248 @JvmField
249 val owner: Any?
<lambda>null250 ) : CancellableContinuation<Unit> by cont, Waiter by cont {
251 override fun tryResume(value: Unit, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? {
252 assert { this@MutexImpl.owner.value === NO_OWNER }
253 val token = cont.tryResume(value, idempotent) {
254 assert { this@MutexImpl.owner.value.let { it === NO_OWNER ||it === owner } }
255 this@MutexImpl.owner.value = owner
256 unlock(owner)
257 }
258 if (token != null) {
259 assert { this@MutexImpl.owner.value === NO_OWNER }
260 this@MutexImpl.owner.value = owner
261 }
262 return token
263 }
264
265 override fun resume(value: Unit, onCancellation: ((cause: Throwable) -> Unit)?) {
266 assert { this@MutexImpl.owner.value === NO_OWNER }
267 this@MutexImpl.owner.value = owner
268 cont.resume(value) { unlock(owner) }
269 }
270 }
271
272 private inner class SelectInstanceWithOwner<Q>(
273 @JvmField
274 val select: SelectInstanceInternal<Q>,
275 @JvmField
276 val owner: Any?
<lambda>null277 ) : SelectInstanceInternal<Q> by select {
278 override fun trySelect(clauseObject: Any, result: Any?): Boolean {
279 assert { this@MutexImpl.owner.value === NO_OWNER }
280 return select.trySelect(clauseObject, result).also { success ->
281 if (success) this@MutexImpl.owner.value = owner
282 }
283 }
284
285 override fun selectInRegistrationPhase(internalResult: Any?) {
286 assert { this@MutexImpl.owner.value === NO_OWNER }
287 this@MutexImpl.owner.value = owner
288 select.selectInRegistrationPhase(internalResult)
289 }
290 }
291
toStringnull292 override fun toString() = "Mutex@${hexAddress}[isLocked=$isLocked,owner=${owner.value}]"
293 }
294
295 private val NO_OWNER = Symbol("NO_OWNER")
296 private val ON_LOCK_ALREADY_LOCKED_BY_OWNER = Symbol("ALREADY_LOCKED_BY_OWNER")
297
298 private const val TRY_LOCK_SUCCESS = 0
299 private const val TRY_LOCK_FAILED = 1
300 private const val TRY_LOCK_ALREADY_LOCKED_BY_OWNER = 2
301
302 private const val HOLDS_LOCK_UNLOCKED = 0
303 private const val HOLDS_LOCK_YES = 1
304 private const val HOLDS_LOCK_ANOTHER_OWNER = 2
305