<lambda>null1 package kotlinx.coroutines.sync
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.internal.*
6 import kotlinx.coroutines.selects.*
7 import kotlin.contracts.*
8 import kotlin.coroutines.*
9 import kotlin.js.*
10 import kotlin.math.*
11
12 /**
13 * A counting semaphore for coroutines that logically maintains a number of available permits.
14 * Each [acquire] takes a single permit or suspends until it is available.
15 * Each [release] adds a permit, potentially releasing a suspended acquirer.
16 * Semaphore is fair and maintains a FIFO order of acquirers.
17 *
18 * Semaphores are mostly used to limit the number of coroutines that have access to particular resource.
19 * Semaphore with `permits = 1` is essentially a [Mutex].
20 **/
21 public interface Semaphore {
22 /**
23 * Returns the current number of permits available in this semaphore.
24 */
25 public val availablePermits: Int
26
27 /**
28 * Acquires a permit from this semaphore, suspending until one is available.
29 * All suspending acquirers are processed in first-in-first-out (FIFO) order.
30 *
31 * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
32 * suspending function is waiting, this function immediately resumes with [CancellationException].
33 * There is a **prompt cancellation guarantee**: even if this function is ready to return the result, but was cancelled
34 * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
35 * This function releases the semaphore if it was already acquired by this function before the [CancellationException]
36 * was thrown.
37 *
38 * Note that this function does not check for cancellation when it does not suspend.
39 * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically
40 * check for cancellation in tight loops if needed.
41 *
42 * Use [tryAcquire] to try to acquire a permit of this semaphore without suspension.
43 */
44 public suspend fun acquire()
45
46 /**
47 * Tries to acquire a permit from this semaphore without suspension.
48 *
49 * @return `true` if a permit was acquired, `false` otherwise.
50 */
51 public fun tryAcquire(): Boolean
52
53 /**
54 * Releases a permit, returning it into this semaphore. Resumes the first
55 * suspending acquirer if there is one at the point of invocation.
56 * Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire].
57 */
58 public fun release()
59 }
60
61 /**
62 * Creates new [Semaphore] instance.
63 * @param permits the number of permits available in this semaphore.
64 * @param acquiredPermits the number of already acquired permits,
65 * should be between `0` and `permits` (inclusively).
66 */
67 @Suppress("FunctionName")
Semaphorenull68 public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits)
69
70 /**
71 * Executes the given [action], acquiring a permit from this semaphore at the beginning
72 * and releasing it after the [action] is completed.
73 *
74 * @return the return value of the [action].
75 */
76 @OptIn(ExperimentalContracts::class)
77 public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
78 contract {
79 callsInPlace(action, InvocationKind.EXACTLY_ONCE)
80 }
81 acquire()
82 return try {
83 action()
84 } finally {
85 release()
86 }
87 }
88
89 @Suppress("UNCHECKED_CAST")
90 internal open class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore {
91 /*
92 The queue of waiting acquirers is essentially an infinite array based on the list of segments
93 (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue
94 and dequeue operation, we increment the corresponding counter at the beginning of the operation
95 and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
96 works with an individual cell. We use the corresponding segment pointers to find the required ones.
97
98 Here is a state machine for cells. Note that only one `acquire` and at most one `release` operation
99 can deal with each cell, and that `release` uses `getAndSet(PERMIT)` to perform transitions for performance reasons
100 so that the state `PERMIT` represents different logical states.
101
102 +------+ `acquire` suspends +------+ `release` tries +--------+ // if `cont.tryResume(..)` succeeds, then
103 | NULL | -------------------> | cont | -------------------> | PERMIT | (cont RETRIEVED) // the corresponding `acquire` operation gets
104 +------+ +------+ to resume `cont` +--------+ // a permit and the `release` one completes.
105 | |
106 | | `acquire` request is cancelled and the continuation is
107 | `release` comes | replaced with a special `CANCEL` token to avoid memory leaks
108 | to the slot before V
109 | `acquire` and puts +-----------+ `release` has +--------+
110 | a permit into the | CANCELLED | -----------------> | PERMIT | (RElEASE FAILED)
111 | slot, waiting for +-----------+ failed +--------+
112 | `acquire` after
113 | that.
114 |
115 | `acquire` gets +-------+
116 | +-----------------> | TAKEN | (ELIMINATION HAPPENED)
117 V | the permit +-------+
118 +--------+ |
119 | PERMIT | -<
120 +--------+ |
121 | `release` has waited a bounded time, +--------+
122 +---------------------------------------> | BROKEN | (BOTH RELEASE AND ACQUIRE FAILED)
123 but `acquire` has not come +--------+
124 */
125
126 private val head: AtomicRef<SemaphoreSegment>
127 private val deqIdx = atomic(0L)
128 private val tail: AtomicRef<SemaphoreSegment>
129 private val enqIdx = atomic(0L)
130
131 init {
<lambda>null132 require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" }
<lambda>null133 require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" }
134 val s = SemaphoreSegment(0, null, 2)
135 head = atomic(s)
136 tail = atomic(s)
137 }
138
139 /**
140 * This counter indicates the number of available permits if it is positive,
141 * or the negated number of waiters on this semaphore otherwise.
142 * Note, that 32-bit counter is enough here since the maximal number of available
143 * permits is [permits] which is [Int], and the maximum number of waiting acquirers
144 * cannot be greater than 2^31 in any real application.
145 */
146 private val _availablePermits = atomic(permits - acquiredPermits)
147 override val availablePermits: Int get() = max(_availablePermits.value, 0)
148
_null149 private val onCancellationRelease = { _: Throwable -> release() }
150
tryAcquirenull151 override fun tryAcquire(): Boolean {
152 while (true) {
153 // Get the current number of available permits.
154 val p = _availablePermits.value
155 // Is the number of available permits greater
156 // than the maximal one because of an incorrect
157 // `release()` call without a preceding `acquire()`?
158 // Change it to `permits` and start from the beginning.
159 if (p > permits) {
160 coerceAvailablePermitsAtMaximum()
161 continue
162 }
163 // Try to decrement the number of available
164 // permits if it is greater than zero.
165 if (p <= 0) return false
166 if (_availablePermits.compareAndSet(p, p - 1)) return true
167 }
168 }
169
acquirenull170 override suspend fun acquire() {
171 // Decrement the number of available permits.
172 val p = decPermits()
173 // Is the permit acquired?
174 if (p > 0) return // permit acquired
175 // Try to suspend otherwise.
176 // While it looks better when the following function is inlined,
177 // it is important to make `suspend` function invocations in a way
178 // so that the tail-call optimization can be applied here.
179 acquireSlowPath()
180 }
181
acquireSlowPathnull182 private suspend fun acquireSlowPath() = suspendCancellableCoroutineReusable<Unit> sc@ { cont ->
183 // Try to suspend.
184 if (addAcquireToQueue(cont)) return@sc
185 // The suspension has been failed
186 // due to the synchronous resumption mode.
187 // Restart the whole `acquire`.
188 acquire(cont)
189 }
190
191 @JsName("acquireCont")
acquirenull192 protected fun acquire(waiter: CancellableContinuation<Unit>) = acquire(
193 waiter = waiter,
194 suspend = { cont -> addAcquireToQueue(cont as Waiter) },
contnull195 onAcquired = { cont -> cont.resume(Unit, onCancellationRelease) }
196 )
197
198 @JsName("acquireInternal")
acquirenull199 private inline fun <W> acquire(waiter: W, suspend: (waiter: W) -> Boolean, onAcquired: (waiter: W) -> Unit) {
200 while (true) {
201 // Decrement the number of available permits at first.
202 val p = decPermits()
203 // Is the permit acquired?
204 if (p > 0) {
205 onAcquired(waiter)
206 return
207 }
208 // Permit has not been acquired, try to suspend.
209 if (suspend(waiter)) return
210 }
211 }
212
213 // We do not fully support `onAcquire` as it is needed only for `Mutex.onLock`.
214 @Suppress("UNUSED_PARAMETER")
onAcquireRegFunctionnull215 protected fun onAcquireRegFunction(select: SelectInstance<*>, ignoredParam: Any?) =
216 acquire(
217 waiter = select,
218 suspend = { s -> addAcquireToQueue(s as Waiter) },
snull219 onAcquired = { s -> s.selectInRegistrationPhase(Unit) }
220 )
221
222 /**
223 * Decrements the number of available permits
224 * and ensures that it is not greater than [permits]
225 * at the point of decrement. The last may happen
226 * due to an incorrect `release()` call without
227 * a preceding `acquire()`.
228 */
decPermitsnull229 private fun decPermits(): Int {
230 while (true) {
231 // Decrement the number of available permits.
232 val p = _availablePermits.getAndDecrement()
233 // Is the number of available permits greater
234 // than the maximal one due to an incorrect
235 // `release()` call without a preceding `acquire()`?
236 if (p > permits) continue
237 // The number of permits is correct, return it.
238 return p
239 }
240 }
241
releasenull242 override fun release() {
243 while (true) {
244 // Increment the number of available permits.
245 val p = _availablePermits.getAndIncrement()
246 // Is this `release` call correct and does not
247 // exceed the maximal number of permits?
248 if (p >= permits) {
249 // Revert the number of available permits
250 // back to the correct one and fail with error.
251 coerceAvailablePermitsAtMaximum()
252 error("The number of released permits cannot be greater than $permits")
253 }
254 // Is there a waiter that should be resumed?
255 if (p >= 0) return
256 // Try to resume the first waiter, and
257 // restart the operation if either this
258 // first waiter is cancelled or
259 // due to `SYNC` resumption mode.
260 if (tryResumeNextFromQueue()) return
261 }
262 }
263
264 /**
265 * Changes the number of available permits to
266 * [permits] if it became greater due to an
267 * incorrect [release] call.
268 */
coerceAvailablePermitsAtMaximumnull269 private fun coerceAvailablePermitsAtMaximum() {
270 while (true) {
271 val cur = _availablePermits.value
272 if (cur <= permits) break
273 if (_availablePermits.compareAndSet(cur, permits)) break
274 }
275 }
276
277 /**
278 * Returns `false` if the received permit cannot be used and the calling operation should restart.
279 */
addAcquireToQueuenull280 private fun addAcquireToQueue(waiter: Waiter): Boolean {
281 val curTail = this.tail.value
282 val enqIdx = enqIdx.getAndIncrement()
283 val createNewSegment = ::createSegment
284 val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
285 createNewSegment = createNewSegment).segment // cannot be closed
286 val i = (enqIdx % SEGMENT_SIZE).toInt()
287 // the regular (fast) path -- if the cell is empty, try to install continuation
288 if (segment.cas(i, null, waiter)) { // installed continuation successfully
289 waiter.invokeOnCancellation(segment, i)
290 return true
291 }
292 // On CAS failure -- the cell must be either PERMIT or BROKEN
293 // If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it
294 if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair
295 /// This continuation is not yet published, but still can be cancelled via outer job
296 when (waiter) {
297 is CancellableContinuation<*> -> {
298 waiter as CancellableContinuation<Unit>
299 waiter.resume(Unit, onCancellationRelease)
300 }
301 is SelectInstance<*> -> {
302 waiter.selectInRegistrationPhase(Unit)
303 }
304 else -> error("unexpected: $waiter")
305 }
306 return true
307 }
308 assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it
309 return false // broken cell, need to retry on a different cell
310 }
311
312 @Suppress("UNCHECKED_CAST")
tryResumeNextFromQueuenull313 private fun tryResumeNextFromQueue(): Boolean {
314 val curHead = this.head.value
315 val deqIdx = deqIdx.getAndIncrement()
316 val id = deqIdx / SEGMENT_SIZE
317 val createNewSegment = ::createSegment
318 val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
319 createNewSegment = createNewSegment).segment // cannot be closed
320 segment.cleanPrev()
321 if (segment.id > id) return false
322 val i = (deqIdx % SEGMENT_SIZE).toInt()
323 val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state
324 when {
325 cellState === null -> {
326 // Acquire has not touched this cell yet, wait until it comes for a bounded time
327 // The cell state can only transition from PERMIT to TAKEN by addAcquireToQueue
328 repeat(MAX_SPIN_CYCLES) {
329 if (segment.get(i) === TAKEN) return true
330 }
331 // Try to break the slot in order not to wait
332 return !segment.cas(i, PERMIT, BROKEN)
333 }
334 cellState === CANCELLED -> return false // the acquirer has already been cancelled
335 else -> return cellState.tryResumeAcquire()
336 }
337 }
338
tryResumeAcquirenull339 private fun Any.tryResumeAcquire(): Boolean = when(this) {
340 is CancellableContinuation<*> -> {
341 this as CancellableContinuation<Unit>
342 val token = tryResume(Unit, null, onCancellationRelease)
343 if (token != null) {
344 completeResume(token)
345 true
346 } else false
347 }
348 is SelectInstance<*> -> {
349 trySelect(this@SemaphoreImpl, Unit)
350 }
351 else -> error("unexpected: $this")
352 }
353 }
354
createSegmentnull355 private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0)
356
357 private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) : Segment<SemaphoreSegment>(id, prev, pointers) {
358 val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
359 override val numberOfSlots: Int get() = SEGMENT_SIZE
360
361 @Suppress("NOTHING_TO_INLINE")
362 inline fun get(index: Int): Any? = acquirers[index].value
363
364 @Suppress("NOTHING_TO_INLINE")
365 inline fun set(index: Int, value: Any?) {
366 acquirers[index].value = value
367 }
368
369 @Suppress("NOTHING_TO_INLINE")
370 inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
371
372 @Suppress("NOTHING_TO_INLINE")
373 inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value)
374
375 // Cleans the acquirer slot located by the specified index
376 // and removes this segment physically if all slots are cleaned.
377 override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
378 // Clean the slot
379 set(index, CANCELLED)
380 // Remove this segment if needed
381 onSlotCleaned()
382 }
383
384 override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
385 }
386 private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100)
387 private val PERMIT = Symbol("PERMIT")
388 private val TAKEN = Symbol("TAKEN")
389 private val BROKEN = Symbol("BROKEN")
390 private val CANCELLED = Symbol("CANCELLED")
391 private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16)
392