xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/common/src/sync/Semaphore.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)

<lambda>null1 package kotlinx.coroutines.sync
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.internal.*
6 import kotlinx.coroutines.selects.*
7 import kotlin.contracts.*
8 import kotlin.coroutines.*
9 import kotlin.js.*
10 import kotlin.math.*
11 
12 /**
13  * A counting semaphore for coroutines that logically maintains a number of available permits.
14  * Each [acquire] takes a single permit or suspends until it is available.
15  * Each [release] adds a permit, potentially releasing a suspended acquirer.
16  * Semaphore is fair and maintains a FIFO order of acquirers.
17  *
18  * Semaphores are mostly used to limit the number of coroutines that have access to particular resource.
19  * Semaphore with `permits = 1` is essentially a [Mutex].
20  **/
21 public interface Semaphore {
22     /**
23      * Returns the current number of permits available in this semaphore.
24      */
25     public val availablePermits: Int
26 
27     /**
28      * Acquires a permit from this semaphore, suspending until one is available.
29      * All suspending acquirers are processed in first-in-first-out (FIFO) order.
30      *
31      * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
32      * suspending function is waiting, this function immediately resumes with [CancellationException].
33      * There is a **prompt cancellation guarantee**: even if this function is ready to return the result, but was cancelled
34      * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
35      * This function releases the semaphore if it was already acquired by this function before the [CancellationException]
36      * was thrown.
37      *
38      * Note that this function does not check for cancellation when it does not suspend.
39      * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically
40      * check for cancellation in tight loops if needed.
41      *
42      * Use [tryAcquire] to try to acquire a permit of this semaphore without suspension.
43      */
44     public suspend fun acquire()
45 
46     /**
47      * Tries to acquire a permit from this semaphore without suspension.
48      *
49      * @return `true` if a permit was acquired, `false` otherwise.
50      */
51     public fun tryAcquire(): Boolean
52 
53     /**
54      * Releases a permit, returning it into this semaphore. Resumes the first
55      * suspending acquirer if there is one at the point of invocation.
56      * Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire].
57      */
58     public fun release()
59 }
60 
61 /**
62  * Creates new [Semaphore] instance.
63  * @param permits the number of permits available in this semaphore.
64  * @param acquiredPermits the number of already acquired permits,
65  *        should be between `0` and `permits` (inclusively).
66  */
67 @Suppress("FunctionName")
Semaphorenull68 public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits)
69 
70 /**
71  * Executes the given [action], acquiring a permit from this semaphore at the beginning
72  * and releasing it after the [action] is completed.
73  *
74  * @return the return value of the [action].
75  */
76 @OptIn(ExperimentalContracts::class)
77 public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
78     contract {
79         callsInPlace(action, InvocationKind.EXACTLY_ONCE)
80     }
81     acquire()
82     return try {
83         action()
84     } finally {
85         release()
86     }
87 }
88 
89 @Suppress("UNCHECKED_CAST")
90 internal open class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore {
91     /*
92        The queue of waiting acquirers is essentially an infinite array based on the list of segments
93        (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue
94        and dequeue operation, we increment the corresponding counter at the beginning of the operation
95        and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
96        works with an individual cell. We use the corresponding segment pointers to find the required ones.
97 
98        Here is a state machine for cells. Note that only one `acquire` and at most one `release` operation
99        can deal with each cell, and that `release` uses `getAndSet(PERMIT)` to perform transitions for performance reasons
100        so that the state `PERMIT` represents different logical states.
101 
102          +------+ `acquire` suspends   +------+   `release` tries    +--------+                    // if `cont.tryResume(..)` succeeds, then
103          | NULL | -------------------> | cont | -------------------> | PERMIT | (cont RETRIEVED)   // the corresponding `acquire` operation gets
104          +------+                      +------+   to resume `cont`   +--------+                    // a permit and the `release` one completes.
105             |                             |
106             |                             | `acquire` request is cancelled and the continuation is
107             | `release` comes             | replaced with a special `CANCEL` token to avoid memory leaks
108             | to the slot before          V
109             | `acquire` and puts    +-----------+   `release` has    +--------+
110             | a permit into the     | CANCELLED | -----------------> | PERMIT | (RElEASE FAILED)
111             | slot, waiting for     +-----------+        failed      +--------+
112             | `acquire` after
113             | that.
114             |
115             |           `acquire` gets   +-------+
116             |        +-----------------> | TAKEN | (ELIMINATION HAPPENED)
117             V        |    the permit     +-------+
118         +--------+   |
119         | PERMIT | -<
120         +--------+  |
121                     |  `release` has waited a bounded time,   +--------+
122                     +---------------------------------------> | BROKEN | (BOTH RELEASE AND ACQUIRE FAILED)
123                            but `acquire` has not come         +--------+
124     */
125 
126     private val head: AtomicRef<SemaphoreSegment>
127     private val deqIdx = atomic(0L)
128     private val tail: AtomicRef<SemaphoreSegment>
129     private val enqIdx = atomic(0L)
130 
131     init {
<lambda>null132         require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" }
<lambda>null133         require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" }
134         val s = SemaphoreSegment(0, null, 2)
135         head = atomic(s)
136         tail = atomic(s)
137     }
138 
139     /**
140      * This counter indicates the number of available permits if it is positive,
141      * or the negated number of waiters on this semaphore otherwise.
142      * Note, that 32-bit counter is enough here since the maximal number of available
143      * permits is [permits] which is [Int], and the maximum number of waiting acquirers
144      * cannot be greater than 2^31 in any real application.
145      */
146     private val _availablePermits = atomic(permits - acquiredPermits)
147     override val availablePermits: Int get() = max(_availablePermits.value, 0)
148 
_null149     private val onCancellationRelease = { _: Throwable -> release() }
150 
tryAcquirenull151     override fun tryAcquire(): Boolean {
152         while (true) {
153             // Get the current number of available permits.
154             val p = _availablePermits.value
155             // Is the number of available permits greater
156             // than the maximal one because of an incorrect
157             // `release()` call without a preceding `acquire()`?
158             // Change it to `permits` and start from the beginning.
159             if (p > permits) {
160                 coerceAvailablePermitsAtMaximum()
161                 continue
162             }
163             // Try to decrement the number of available
164             // permits if it is greater than zero.
165             if (p <= 0) return false
166             if (_availablePermits.compareAndSet(p, p - 1)) return true
167         }
168     }
169 
acquirenull170     override suspend fun acquire() {
171         // Decrement the number of available permits.
172         val p = decPermits()
173         // Is the permit acquired?
174         if (p > 0) return // permit acquired
175         // Try to suspend otherwise.
176         // While it looks better when the following function is inlined,
177         // it is important to make `suspend` function invocations in a way
178         // so that the tail-call optimization can be applied here.
179         acquireSlowPath()
180     }
181 
acquireSlowPathnull182     private suspend fun acquireSlowPath() = suspendCancellableCoroutineReusable<Unit> sc@ { cont ->
183         // Try to suspend.
184         if (addAcquireToQueue(cont)) return@sc
185         // The suspension has been failed
186         // due to the synchronous resumption mode.
187         // Restart the whole `acquire`.
188         acquire(cont)
189     }
190 
191     @JsName("acquireCont")
acquirenull192     protected fun acquire(waiter: CancellableContinuation<Unit>) = acquire(
193         waiter = waiter,
194         suspend = { cont -> addAcquireToQueue(cont as Waiter) },
contnull195         onAcquired = { cont -> cont.resume(Unit, onCancellationRelease) }
196     )
197 
198     @JsName("acquireInternal")
acquirenull199     private inline fun <W> acquire(waiter: W, suspend: (waiter: W) -> Boolean, onAcquired: (waiter: W) -> Unit) {
200         while (true) {
201             // Decrement the number of available permits at first.
202             val p = decPermits()
203             // Is the permit acquired?
204             if (p > 0) {
205                 onAcquired(waiter)
206                 return
207             }
208             // Permit has not been acquired, try to suspend.
209             if (suspend(waiter)) return
210         }
211     }
212 
213     // We do not fully support `onAcquire` as it is needed only for `Mutex.onLock`.
214     @Suppress("UNUSED_PARAMETER")
onAcquireRegFunctionnull215     protected fun onAcquireRegFunction(select: SelectInstance<*>, ignoredParam: Any?) =
216         acquire(
217             waiter = select,
218             suspend = { s -> addAcquireToQueue(s as Waiter) },
snull219             onAcquired = { s -> s.selectInRegistrationPhase(Unit) }
220         )
221 
222     /**
223      * Decrements the number of available permits
224      * and ensures that it is not greater than [permits]
225      * at the point of decrement. The last may happen
226      * due to an incorrect `release()` call without
227      * a preceding `acquire()`.
228      */
decPermitsnull229     private fun decPermits(): Int {
230         while (true) {
231             // Decrement the number of available permits.
232             val p = _availablePermits.getAndDecrement()
233             // Is the number of available permits greater
234             // than the maximal one due to an incorrect
235             // `release()` call without a preceding `acquire()`?
236             if (p > permits) continue
237             // The number of permits is correct, return it.
238             return p
239         }
240     }
241 
releasenull242     override fun release() {
243         while (true) {
244             // Increment the number of available permits.
245             val p = _availablePermits.getAndIncrement()
246             // Is this `release` call correct and does not
247             // exceed the maximal number of permits?
248             if (p >= permits) {
249                 // Revert the number of available permits
250                 // back to the correct one and fail with error.
251                 coerceAvailablePermitsAtMaximum()
252                 error("The number of released permits cannot be greater than $permits")
253             }
254             // Is there a waiter that should be resumed?
255             if (p >= 0) return
256             // Try to resume the first waiter, and
257             // restart the operation if either this
258             // first waiter is cancelled or
259             // due to `SYNC` resumption mode.
260             if (tryResumeNextFromQueue()) return
261         }
262     }
263 
264     /**
265      * Changes the number of available permits to
266      * [permits] if it became greater due to an
267      * incorrect [release] call.
268      */
coerceAvailablePermitsAtMaximumnull269     private fun coerceAvailablePermitsAtMaximum() {
270         while (true) {
271             val cur = _availablePermits.value
272             if (cur <= permits) break
273             if (_availablePermits.compareAndSet(cur, permits)) break
274         }
275     }
276 
277     /**
278      * Returns `false` if the received permit cannot be used and the calling operation should restart.
279      */
addAcquireToQueuenull280     private fun addAcquireToQueue(waiter: Waiter): Boolean {
281         val curTail = this.tail.value
282         val enqIdx = enqIdx.getAndIncrement()
283         val createNewSegment = ::createSegment
284         val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
285             createNewSegment = createNewSegment).segment // cannot be closed
286         val i = (enqIdx % SEGMENT_SIZE).toInt()
287         // the regular (fast) path -- if the cell is empty, try to install continuation
288         if (segment.cas(i, null, waiter)) { // installed continuation successfully
289             waiter.invokeOnCancellation(segment, i)
290             return true
291         }
292         // On CAS failure -- the cell must be either PERMIT or BROKEN
293         // If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it
294         if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair
295             /// This continuation is not yet published, but still can be cancelled via outer job
296             when (waiter) {
297                 is CancellableContinuation<*> -> {
298                     waiter as CancellableContinuation<Unit>
299                     waiter.resume(Unit, onCancellationRelease)
300                 }
301                 is SelectInstance<*> -> {
302                     waiter.selectInRegistrationPhase(Unit)
303                 }
304                 else -> error("unexpected: $waiter")
305             }
306             return true
307         }
308         assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it
309         return false // broken cell, need to retry on a different cell
310     }
311 
312     @Suppress("UNCHECKED_CAST")
tryResumeNextFromQueuenull313     private fun tryResumeNextFromQueue(): Boolean {
314         val curHead = this.head.value
315         val deqIdx = deqIdx.getAndIncrement()
316         val id = deqIdx / SEGMENT_SIZE
317         val createNewSegment = ::createSegment
318         val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
319             createNewSegment = createNewSegment).segment // cannot be closed
320         segment.cleanPrev()
321         if (segment.id > id) return false
322         val i = (deqIdx % SEGMENT_SIZE).toInt()
323         val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state
324         when {
325             cellState === null -> {
326                 // Acquire has not touched this cell yet, wait until it comes for a bounded time
327                 // The cell state can only transition from PERMIT to TAKEN by addAcquireToQueue
328                 repeat(MAX_SPIN_CYCLES) {
329                     if (segment.get(i) === TAKEN) return true
330                 }
331                 // Try to break the slot in order not to wait
332                 return !segment.cas(i, PERMIT, BROKEN)
333             }
334             cellState === CANCELLED -> return false // the acquirer has already been cancelled
335             else -> return cellState.tryResumeAcquire()
336         }
337     }
338 
tryResumeAcquirenull339     private fun Any.tryResumeAcquire(): Boolean = when(this) {
340         is CancellableContinuation<*> -> {
341             this as CancellableContinuation<Unit>
342             val token = tryResume(Unit, null, onCancellationRelease)
343             if (token != null) {
344                 completeResume(token)
345                 true
346             } else false
347         }
348         is SelectInstance<*> -> {
349             trySelect(this@SemaphoreImpl, Unit)
350         }
351         else -> error("unexpected: $this")
352     }
353 }
354 
createSegmentnull355 private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0)
356 
357 private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) : Segment<SemaphoreSegment>(id, prev, pointers) {
358     val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
359     override val numberOfSlots: Int get() = SEGMENT_SIZE
360 
361     @Suppress("NOTHING_TO_INLINE")
362     inline fun get(index: Int): Any? = acquirers[index].value
363 
364     @Suppress("NOTHING_TO_INLINE")
365     inline fun set(index: Int, value: Any?) {
366         acquirers[index].value = value
367     }
368 
369     @Suppress("NOTHING_TO_INLINE")
370     inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
371 
372     @Suppress("NOTHING_TO_INLINE")
373     inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value)
374 
375     // Cleans the acquirer slot located by the specified index
376     // and removes this segment physically if all slots are cleaned.
377     override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
378         // Clean the slot
379         set(index, CANCELLED)
380         // Remove this segment if needed
381         onSlotCleaned()
382     }
383 
384     override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
385 }
386 private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100)
387 private val PERMIT = Symbol("PERMIT")
388 private val TAKEN = Symbol("TAKEN")
389 private val BROKEN = Symbol("BROKEN")
390 private val CANCELLED = Symbol("CANCELLED")
391 private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16)
392