xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/asyncio/locks.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1*cda5da8dSAndroid Build Coastguard Worker"""Synchronization primitives."""
2*cda5da8dSAndroid Build Coastguard Worker
3*cda5da8dSAndroid Build Coastguard Worker__all__ = ('Lock', 'Event', 'Condition', 'Semaphore',
4*cda5da8dSAndroid Build Coastguard Worker           'BoundedSemaphore', 'Barrier')
5*cda5da8dSAndroid Build Coastguard Worker
6*cda5da8dSAndroid Build Coastguard Workerimport collections
7*cda5da8dSAndroid Build Coastguard Workerimport enum
8*cda5da8dSAndroid Build Coastguard Worker
9*cda5da8dSAndroid Build Coastguard Workerfrom . import exceptions
10*cda5da8dSAndroid Build Coastguard Workerfrom . import mixins
11*cda5da8dSAndroid Build Coastguard Workerfrom . import tasks
12*cda5da8dSAndroid Build Coastguard Worker
13*cda5da8dSAndroid Build Coastguard Workerclass _ContextManagerMixin:
14*cda5da8dSAndroid Build Coastguard Worker    async def __aenter__(self):
15*cda5da8dSAndroid Build Coastguard Worker        await self.acquire()
16*cda5da8dSAndroid Build Coastguard Worker        # We have no use for the "as ..."  clause in the with
17*cda5da8dSAndroid Build Coastguard Worker        # statement for locks.
18*cda5da8dSAndroid Build Coastguard Worker        return None
19*cda5da8dSAndroid Build Coastguard Worker
20*cda5da8dSAndroid Build Coastguard Worker    async def __aexit__(self, exc_type, exc, tb):
21*cda5da8dSAndroid Build Coastguard Worker        self.release()
22*cda5da8dSAndroid Build Coastguard Worker
23*cda5da8dSAndroid Build Coastguard Worker
24*cda5da8dSAndroid Build Coastguard Workerclass Lock(_ContextManagerMixin, mixins._LoopBoundMixin):
25*cda5da8dSAndroid Build Coastguard Worker    """Primitive lock objects.
26*cda5da8dSAndroid Build Coastguard Worker
27*cda5da8dSAndroid Build Coastguard Worker    A primitive lock is a synchronization primitive that is not owned
28*cda5da8dSAndroid Build Coastguard Worker    by a particular coroutine when locked.  A primitive lock is in one
29*cda5da8dSAndroid Build Coastguard Worker    of two states, 'locked' or 'unlocked'.
30*cda5da8dSAndroid Build Coastguard Worker
31*cda5da8dSAndroid Build Coastguard Worker    It is created in the unlocked state.  It has two basic methods,
32*cda5da8dSAndroid Build Coastguard Worker    acquire() and release().  When the state is unlocked, acquire()
33*cda5da8dSAndroid Build Coastguard Worker    changes the state to locked and returns immediately.  When the
34*cda5da8dSAndroid Build Coastguard Worker    state is locked, acquire() blocks until a call to release() in
35*cda5da8dSAndroid Build Coastguard Worker    another coroutine changes it to unlocked, then the acquire() call
36*cda5da8dSAndroid Build Coastguard Worker    resets it to locked and returns.  The release() method should only
37*cda5da8dSAndroid Build Coastguard Worker    be called in the locked state; it changes the state to unlocked
38*cda5da8dSAndroid Build Coastguard Worker    and returns immediately.  If an attempt is made to release an
39*cda5da8dSAndroid Build Coastguard Worker    unlocked lock, a RuntimeError will be raised.
40*cda5da8dSAndroid Build Coastguard Worker
41*cda5da8dSAndroid Build Coastguard Worker    When more than one coroutine is blocked in acquire() waiting for
42*cda5da8dSAndroid Build Coastguard Worker    the state to turn to unlocked, only one coroutine proceeds when a
43*cda5da8dSAndroid Build Coastguard Worker    release() call resets the state to unlocked; first coroutine which
44*cda5da8dSAndroid Build Coastguard Worker    is blocked in acquire() is being processed.
45*cda5da8dSAndroid Build Coastguard Worker
46*cda5da8dSAndroid Build Coastguard Worker    acquire() is a coroutine and should be called with 'await'.
47*cda5da8dSAndroid Build Coastguard Worker
48*cda5da8dSAndroid Build Coastguard Worker    Locks also support the asynchronous context management protocol.
49*cda5da8dSAndroid Build Coastguard Worker    'async with lock' statement should be used.
50*cda5da8dSAndroid Build Coastguard Worker
51*cda5da8dSAndroid Build Coastguard Worker    Usage:
52*cda5da8dSAndroid Build Coastguard Worker
53*cda5da8dSAndroid Build Coastguard Worker        lock = Lock()
54*cda5da8dSAndroid Build Coastguard Worker        ...
55*cda5da8dSAndroid Build Coastguard Worker        await lock.acquire()
56*cda5da8dSAndroid Build Coastguard Worker        try:
57*cda5da8dSAndroid Build Coastguard Worker            ...
58*cda5da8dSAndroid Build Coastguard Worker        finally:
59*cda5da8dSAndroid Build Coastguard Worker            lock.release()
60*cda5da8dSAndroid Build Coastguard Worker
61*cda5da8dSAndroid Build Coastguard Worker    Context manager usage:
62*cda5da8dSAndroid Build Coastguard Worker
63*cda5da8dSAndroid Build Coastguard Worker        lock = Lock()
64*cda5da8dSAndroid Build Coastguard Worker        ...
65*cda5da8dSAndroid Build Coastguard Worker        async with lock:
66*cda5da8dSAndroid Build Coastguard Worker             ...
67*cda5da8dSAndroid Build Coastguard Worker
68*cda5da8dSAndroid Build Coastguard Worker    Lock objects can be tested for locking state:
69*cda5da8dSAndroid Build Coastguard Worker
70*cda5da8dSAndroid Build Coastguard Worker        if not lock.locked():
71*cda5da8dSAndroid Build Coastguard Worker           await lock.acquire()
72*cda5da8dSAndroid Build Coastguard Worker        else:
73*cda5da8dSAndroid Build Coastguard Worker           # lock is acquired
74*cda5da8dSAndroid Build Coastguard Worker           ...
75*cda5da8dSAndroid Build Coastguard Worker
76*cda5da8dSAndroid Build Coastguard Worker    """
77*cda5da8dSAndroid Build Coastguard Worker
78*cda5da8dSAndroid Build Coastguard Worker    def __init__(self):
79*cda5da8dSAndroid Build Coastguard Worker        self._waiters = None
80*cda5da8dSAndroid Build Coastguard Worker        self._locked = False
81*cda5da8dSAndroid Build Coastguard Worker
82*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
83*cda5da8dSAndroid Build Coastguard Worker        res = super().__repr__()
84*cda5da8dSAndroid Build Coastguard Worker        extra = 'locked' if self._locked else 'unlocked'
85*cda5da8dSAndroid Build Coastguard Worker        if self._waiters:
86*cda5da8dSAndroid Build Coastguard Worker            extra = f'{extra}, waiters:{len(self._waiters)}'
87*cda5da8dSAndroid Build Coastguard Worker        return f'<{res[1:-1]} [{extra}]>'
88*cda5da8dSAndroid Build Coastguard Worker
89*cda5da8dSAndroid Build Coastguard Worker    def locked(self):
90*cda5da8dSAndroid Build Coastguard Worker        """Return True if lock is acquired."""
91*cda5da8dSAndroid Build Coastguard Worker        return self._locked
92*cda5da8dSAndroid Build Coastguard Worker
93*cda5da8dSAndroid Build Coastguard Worker    async def acquire(self):
94*cda5da8dSAndroid Build Coastguard Worker        """Acquire a lock.
95*cda5da8dSAndroid Build Coastguard Worker
96*cda5da8dSAndroid Build Coastguard Worker        This method blocks until the lock is unlocked, then sets it to
97*cda5da8dSAndroid Build Coastguard Worker        locked and returns True.
98*cda5da8dSAndroid Build Coastguard Worker        """
99*cda5da8dSAndroid Build Coastguard Worker        if (not self._locked and (self._waiters is None or
100*cda5da8dSAndroid Build Coastguard Worker                all(w.cancelled() for w in self._waiters))):
101*cda5da8dSAndroid Build Coastguard Worker            self._locked = True
102*cda5da8dSAndroid Build Coastguard Worker            return True
103*cda5da8dSAndroid Build Coastguard Worker
104*cda5da8dSAndroid Build Coastguard Worker        if self._waiters is None:
105*cda5da8dSAndroid Build Coastguard Worker            self._waiters = collections.deque()
106*cda5da8dSAndroid Build Coastguard Worker        fut = self._get_loop().create_future()
107*cda5da8dSAndroid Build Coastguard Worker        self._waiters.append(fut)
108*cda5da8dSAndroid Build Coastguard Worker
109*cda5da8dSAndroid Build Coastguard Worker        # Finally block should be called before the CancelledError
110*cda5da8dSAndroid Build Coastguard Worker        # handling as we don't want CancelledError to call
111*cda5da8dSAndroid Build Coastguard Worker        # _wake_up_first() and attempt to wake up itself.
112*cda5da8dSAndroid Build Coastguard Worker        try:
113*cda5da8dSAndroid Build Coastguard Worker            try:
114*cda5da8dSAndroid Build Coastguard Worker                await fut
115*cda5da8dSAndroid Build Coastguard Worker            finally:
116*cda5da8dSAndroid Build Coastguard Worker                self._waiters.remove(fut)
117*cda5da8dSAndroid Build Coastguard Worker        except exceptions.CancelledError:
118*cda5da8dSAndroid Build Coastguard Worker            if not self._locked:
119*cda5da8dSAndroid Build Coastguard Worker                self._wake_up_first()
120*cda5da8dSAndroid Build Coastguard Worker            raise
121*cda5da8dSAndroid Build Coastguard Worker
122*cda5da8dSAndroid Build Coastguard Worker        self._locked = True
123*cda5da8dSAndroid Build Coastguard Worker        return True
124*cda5da8dSAndroid Build Coastguard Worker
125*cda5da8dSAndroid Build Coastguard Worker    def release(self):
126*cda5da8dSAndroid Build Coastguard Worker        """Release a lock.
127*cda5da8dSAndroid Build Coastguard Worker
128*cda5da8dSAndroid Build Coastguard Worker        When the lock is locked, reset it to unlocked, and return.
129*cda5da8dSAndroid Build Coastguard Worker        If any other coroutines are blocked waiting for the lock to become
130*cda5da8dSAndroid Build Coastguard Worker        unlocked, allow exactly one of them to proceed.
131*cda5da8dSAndroid Build Coastguard Worker
132*cda5da8dSAndroid Build Coastguard Worker        When invoked on an unlocked lock, a RuntimeError is raised.
133*cda5da8dSAndroid Build Coastguard Worker
134*cda5da8dSAndroid Build Coastguard Worker        There is no return value.
135*cda5da8dSAndroid Build Coastguard Worker        """
136*cda5da8dSAndroid Build Coastguard Worker        if self._locked:
137*cda5da8dSAndroid Build Coastguard Worker            self._locked = False
138*cda5da8dSAndroid Build Coastguard Worker            self._wake_up_first()
139*cda5da8dSAndroid Build Coastguard Worker        else:
140*cda5da8dSAndroid Build Coastguard Worker            raise RuntimeError('Lock is not acquired.')
141*cda5da8dSAndroid Build Coastguard Worker
142*cda5da8dSAndroid Build Coastguard Worker    def _wake_up_first(self):
143*cda5da8dSAndroid Build Coastguard Worker        """Wake up the first waiter if it isn't done."""
144*cda5da8dSAndroid Build Coastguard Worker        if not self._waiters:
145*cda5da8dSAndroid Build Coastguard Worker            return
146*cda5da8dSAndroid Build Coastguard Worker        try:
147*cda5da8dSAndroid Build Coastguard Worker            fut = next(iter(self._waiters))
148*cda5da8dSAndroid Build Coastguard Worker        except StopIteration:
149*cda5da8dSAndroid Build Coastguard Worker            return
150*cda5da8dSAndroid Build Coastguard Worker
151*cda5da8dSAndroid Build Coastguard Worker        # .done() necessarily means that a waiter will wake up later on and
152*cda5da8dSAndroid Build Coastguard Worker        # either take the lock, or, if it was cancelled and lock wasn't
153*cda5da8dSAndroid Build Coastguard Worker        # taken already, will hit this again and wake up a new waiter.
154*cda5da8dSAndroid Build Coastguard Worker        if not fut.done():
155*cda5da8dSAndroid Build Coastguard Worker            fut.set_result(True)
156*cda5da8dSAndroid Build Coastguard Worker
157*cda5da8dSAndroid Build Coastguard Worker
158*cda5da8dSAndroid Build Coastguard Workerclass Event(mixins._LoopBoundMixin):
159*cda5da8dSAndroid Build Coastguard Worker    """Asynchronous equivalent to threading.Event.
160*cda5da8dSAndroid Build Coastguard Worker
161*cda5da8dSAndroid Build Coastguard Worker    Class implementing event objects. An event manages a flag that can be set
162*cda5da8dSAndroid Build Coastguard Worker    to true with the set() method and reset to false with the clear() method.
163*cda5da8dSAndroid Build Coastguard Worker    The wait() method blocks until the flag is true. The flag is initially
164*cda5da8dSAndroid Build Coastguard Worker    false.
165*cda5da8dSAndroid Build Coastguard Worker    """
166*cda5da8dSAndroid Build Coastguard Worker
167*cda5da8dSAndroid Build Coastguard Worker    def __init__(self):
168*cda5da8dSAndroid Build Coastguard Worker        self._waiters = collections.deque()
169*cda5da8dSAndroid Build Coastguard Worker        self._value = False
170*cda5da8dSAndroid Build Coastguard Worker
171*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
172*cda5da8dSAndroid Build Coastguard Worker        res = super().__repr__()
173*cda5da8dSAndroid Build Coastguard Worker        extra = 'set' if self._value else 'unset'
174*cda5da8dSAndroid Build Coastguard Worker        if self._waiters:
175*cda5da8dSAndroid Build Coastguard Worker            extra = f'{extra}, waiters:{len(self._waiters)}'
176*cda5da8dSAndroid Build Coastguard Worker        return f'<{res[1:-1]} [{extra}]>'
177*cda5da8dSAndroid Build Coastguard Worker
178*cda5da8dSAndroid Build Coastguard Worker    def is_set(self):
179*cda5da8dSAndroid Build Coastguard Worker        """Return True if and only if the internal flag is true."""
180*cda5da8dSAndroid Build Coastguard Worker        return self._value
181*cda5da8dSAndroid Build Coastguard Worker
182*cda5da8dSAndroid Build Coastguard Worker    def set(self):
183*cda5da8dSAndroid Build Coastguard Worker        """Set the internal flag to true. All coroutines waiting for it to
184*cda5da8dSAndroid Build Coastguard Worker        become true are awakened. Coroutine that call wait() once the flag is
185*cda5da8dSAndroid Build Coastguard Worker        true will not block at all.
186*cda5da8dSAndroid Build Coastguard Worker        """
187*cda5da8dSAndroid Build Coastguard Worker        if not self._value:
188*cda5da8dSAndroid Build Coastguard Worker            self._value = True
189*cda5da8dSAndroid Build Coastguard Worker
190*cda5da8dSAndroid Build Coastguard Worker            for fut in self._waiters:
191*cda5da8dSAndroid Build Coastguard Worker                if not fut.done():
192*cda5da8dSAndroid Build Coastguard Worker                    fut.set_result(True)
193*cda5da8dSAndroid Build Coastguard Worker
194*cda5da8dSAndroid Build Coastguard Worker    def clear(self):
195*cda5da8dSAndroid Build Coastguard Worker        """Reset the internal flag to false. Subsequently, coroutines calling
196*cda5da8dSAndroid Build Coastguard Worker        wait() will block until set() is called to set the internal flag
197*cda5da8dSAndroid Build Coastguard Worker        to true again."""
198*cda5da8dSAndroid Build Coastguard Worker        self._value = False
199*cda5da8dSAndroid Build Coastguard Worker
200*cda5da8dSAndroid Build Coastguard Worker    async def wait(self):
201*cda5da8dSAndroid Build Coastguard Worker        """Block until the internal flag is true.
202*cda5da8dSAndroid Build Coastguard Worker
203*cda5da8dSAndroid Build Coastguard Worker        If the internal flag is true on entry, return True
204*cda5da8dSAndroid Build Coastguard Worker        immediately.  Otherwise, block until another coroutine calls
205*cda5da8dSAndroid Build Coastguard Worker        set() to set the flag to true, then return True.
206*cda5da8dSAndroid Build Coastguard Worker        """
207*cda5da8dSAndroid Build Coastguard Worker        if self._value:
208*cda5da8dSAndroid Build Coastguard Worker            return True
209*cda5da8dSAndroid Build Coastguard Worker
210*cda5da8dSAndroid Build Coastguard Worker        fut = self._get_loop().create_future()
211*cda5da8dSAndroid Build Coastguard Worker        self._waiters.append(fut)
212*cda5da8dSAndroid Build Coastguard Worker        try:
213*cda5da8dSAndroid Build Coastguard Worker            await fut
214*cda5da8dSAndroid Build Coastguard Worker            return True
215*cda5da8dSAndroid Build Coastguard Worker        finally:
216*cda5da8dSAndroid Build Coastguard Worker            self._waiters.remove(fut)
217*cda5da8dSAndroid Build Coastguard Worker
218*cda5da8dSAndroid Build Coastguard Worker
219*cda5da8dSAndroid Build Coastguard Workerclass Condition(_ContextManagerMixin, mixins._LoopBoundMixin):
220*cda5da8dSAndroid Build Coastguard Worker    """Asynchronous equivalent to threading.Condition.
221*cda5da8dSAndroid Build Coastguard Worker
222*cda5da8dSAndroid Build Coastguard Worker    This class implements condition variable objects. A condition variable
223*cda5da8dSAndroid Build Coastguard Worker    allows one or more coroutines to wait until they are notified by another
224*cda5da8dSAndroid Build Coastguard Worker    coroutine.
225*cda5da8dSAndroid Build Coastguard Worker
226*cda5da8dSAndroid Build Coastguard Worker    A new Lock object is created and used as the underlying lock.
227*cda5da8dSAndroid Build Coastguard Worker    """
228*cda5da8dSAndroid Build Coastguard Worker
229*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, lock=None):
230*cda5da8dSAndroid Build Coastguard Worker        if lock is None:
231*cda5da8dSAndroid Build Coastguard Worker            lock = Lock()
232*cda5da8dSAndroid Build Coastguard Worker
233*cda5da8dSAndroid Build Coastguard Worker        self._lock = lock
234*cda5da8dSAndroid Build Coastguard Worker        # Export the lock's locked(), acquire() and release() methods.
235*cda5da8dSAndroid Build Coastguard Worker        self.locked = lock.locked
236*cda5da8dSAndroid Build Coastguard Worker        self.acquire = lock.acquire
237*cda5da8dSAndroid Build Coastguard Worker        self.release = lock.release
238*cda5da8dSAndroid Build Coastguard Worker
239*cda5da8dSAndroid Build Coastguard Worker        self._waiters = collections.deque()
240*cda5da8dSAndroid Build Coastguard Worker
241*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
242*cda5da8dSAndroid Build Coastguard Worker        res = super().__repr__()
243*cda5da8dSAndroid Build Coastguard Worker        extra = 'locked' if self.locked() else 'unlocked'
244*cda5da8dSAndroid Build Coastguard Worker        if self._waiters:
245*cda5da8dSAndroid Build Coastguard Worker            extra = f'{extra}, waiters:{len(self._waiters)}'
246*cda5da8dSAndroid Build Coastguard Worker        return f'<{res[1:-1]} [{extra}]>'
247*cda5da8dSAndroid Build Coastguard Worker
248*cda5da8dSAndroid Build Coastguard Worker    async def wait(self):
249*cda5da8dSAndroid Build Coastguard Worker        """Wait until notified.
250*cda5da8dSAndroid Build Coastguard Worker
251*cda5da8dSAndroid Build Coastguard Worker        If the calling coroutine has not acquired the lock when this
252*cda5da8dSAndroid Build Coastguard Worker        method is called, a RuntimeError is raised.
253*cda5da8dSAndroid Build Coastguard Worker
254*cda5da8dSAndroid Build Coastguard Worker        This method releases the underlying lock, and then blocks
255*cda5da8dSAndroid Build Coastguard Worker        until it is awakened by a notify() or notify_all() call for
256*cda5da8dSAndroid Build Coastguard Worker        the same condition variable in another coroutine.  Once
257*cda5da8dSAndroid Build Coastguard Worker        awakened, it re-acquires the lock and returns True.
258*cda5da8dSAndroid Build Coastguard Worker        """
259*cda5da8dSAndroid Build Coastguard Worker        if not self.locked():
260*cda5da8dSAndroid Build Coastguard Worker            raise RuntimeError('cannot wait on un-acquired lock')
261*cda5da8dSAndroid Build Coastguard Worker
262*cda5da8dSAndroid Build Coastguard Worker        self.release()
263*cda5da8dSAndroid Build Coastguard Worker        try:
264*cda5da8dSAndroid Build Coastguard Worker            fut = self._get_loop().create_future()
265*cda5da8dSAndroid Build Coastguard Worker            self._waiters.append(fut)
266*cda5da8dSAndroid Build Coastguard Worker            try:
267*cda5da8dSAndroid Build Coastguard Worker                await fut
268*cda5da8dSAndroid Build Coastguard Worker                return True
269*cda5da8dSAndroid Build Coastguard Worker            finally:
270*cda5da8dSAndroid Build Coastguard Worker                self._waiters.remove(fut)
271*cda5da8dSAndroid Build Coastguard Worker
272*cda5da8dSAndroid Build Coastguard Worker        finally:
273*cda5da8dSAndroid Build Coastguard Worker            # Must reacquire lock even if wait is cancelled
274*cda5da8dSAndroid Build Coastguard Worker            cancelled = False
275*cda5da8dSAndroid Build Coastguard Worker            while True:
276*cda5da8dSAndroid Build Coastguard Worker                try:
277*cda5da8dSAndroid Build Coastguard Worker                    await self.acquire()
278*cda5da8dSAndroid Build Coastguard Worker                    break
279*cda5da8dSAndroid Build Coastguard Worker                except exceptions.CancelledError:
280*cda5da8dSAndroid Build Coastguard Worker                    cancelled = True
281*cda5da8dSAndroid Build Coastguard Worker
282*cda5da8dSAndroid Build Coastguard Worker            if cancelled:
283*cda5da8dSAndroid Build Coastguard Worker                raise exceptions.CancelledError
284*cda5da8dSAndroid Build Coastguard Worker
285*cda5da8dSAndroid Build Coastguard Worker    async def wait_for(self, predicate):
286*cda5da8dSAndroid Build Coastguard Worker        """Wait until a predicate becomes true.
287*cda5da8dSAndroid Build Coastguard Worker
288*cda5da8dSAndroid Build Coastguard Worker        The predicate should be a callable which result will be
289*cda5da8dSAndroid Build Coastguard Worker        interpreted as a boolean value.  The final predicate value is
290*cda5da8dSAndroid Build Coastguard Worker        the return value.
291*cda5da8dSAndroid Build Coastguard Worker        """
292*cda5da8dSAndroid Build Coastguard Worker        result = predicate()
293*cda5da8dSAndroid Build Coastguard Worker        while not result:
294*cda5da8dSAndroid Build Coastguard Worker            await self.wait()
295*cda5da8dSAndroid Build Coastguard Worker            result = predicate()
296*cda5da8dSAndroid Build Coastguard Worker        return result
297*cda5da8dSAndroid Build Coastguard Worker
298*cda5da8dSAndroid Build Coastguard Worker    def notify(self, n=1):
299*cda5da8dSAndroid Build Coastguard Worker        """By default, wake up one coroutine waiting on this condition, if any.
300*cda5da8dSAndroid Build Coastguard Worker        If the calling coroutine has not acquired the lock when this method
301*cda5da8dSAndroid Build Coastguard Worker        is called, a RuntimeError is raised.
302*cda5da8dSAndroid Build Coastguard Worker
303*cda5da8dSAndroid Build Coastguard Worker        This method wakes up at most n of the coroutines waiting for the
304*cda5da8dSAndroid Build Coastguard Worker        condition variable; it is a no-op if no coroutines are waiting.
305*cda5da8dSAndroid Build Coastguard Worker
306*cda5da8dSAndroid Build Coastguard Worker        Note: an awakened coroutine does not actually return from its
307*cda5da8dSAndroid Build Coastguard Worker        wait() call until it can reacquire the lock. Since notify() does
308*cda5da8dSAndroid Build Coastguard Worker        not release the lock, its caller should.
309*cda5da8dSAndroid Build Coastguard Worker        """
310*cda5da8dSAndroid Build Coastguard Worker        if not self.locked():
311*cda5da8dSAndroid Build Coastguard Worker            raise RuntimeError('cannot notify on un-acquired lock')
312*cda5da8dSAndroid Build Coastguard Worker
313*cda5da8dSAndroid Build Coastguard Worker        idx = 0
314*cda5da8dSAndroid Build Coastguard Worker        for fut in self._waiters:
315*cda5da8dSAndroid Build Coastguard Worker            if idx >= n:
316*cda5da8dSAndroid Build Coastguard Worker                break
317*cda5da8dSAndroid Build Coastguard Worker
318*cda5da8dSAndroid Build Coastguard Worker            if not fut.done():
319*cda5da8dSAndroid Build Coastguard Worker                idx += 1
320*cda5da8dSAndroid Build Coastguard Worker                fut.set_result(False)
321*cda5da8dSAndroid Build Coastguard Worker
322*cda5da8dSAndroid Build Coastguard Worker    def notify_all(self):
323*cda5da8dSAndroid Build Coastguard Worker        """Wake up all threads waiting on this condition. This method acts
324*cda5da8dSAndroid Build Coastguard Worker        like notify(), but wakes up all waiting threads instead of one. If the
325*cda5da8dSAndroid Build Coastguard Worker        calling thread has not acquired the lock when this method is called,
326*cda5da8dSAndroid Build Coastguard Worker        a RuntimeError is raised.
327*cda5da8dSAndroid Build Coastguard Worker        """
328*cda5da8dSAndroid Build Coastguard Worker        self.notify(len(self._waiters))
329*cda5da8dSAndroid Build Coastguard Worker
330*cda5da8dSAndroid Build Coastguard Worker
331*cda5da8dSAndroid Build Coastguard Workerclass Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
332*cda5da8dSAndroid Build Coastguard Worker    """A Semaphore implementation.
333*cda5da8dSAndroid Build Coastguard Worker
334*cda5da8dSAndroid Build Coastguard Worker    A semaphore manages an internal counter which is decremented by each
335*cda5da8dSAndroid Build Coastguard Worker    acquire() call and incremented by each release() call. The counter
336*cda5da8dSAndroid Build Coastguard Worker    can never go below zero; when acquire() finds that it is zero, it blocks,
337*cda5da8dSAndroid Build Coastguard Worker    waiting until some other thread calls release().
338*cda5da8dSAndroid Build Coastguard Worker
339*cda5da8dSAndroid Build Coastguard Worker    Semaphores also support the context management protocol.
340*cda5da8dSAndroid Build Coastguard Worker
341*cda5da8dSAndroid Build Coastguard Worker    The optional argument gives the initial value for the internal
342*cda5da8dSAndroid Build Coastguard Worker    counter; it defaults to 1. If the value given is less than 0,
343*cda5da8dSAndroid Build Coastguard Worker    ValueError is raised.
344*cda5da8dSAndroid Build Coastguard Worker    """
345*cda5da8dSAndroid Build Coastguard Worker
346*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, value=1):
347*cda5da8dSAndroid Build Coastguard Worker        if value < 0:
348*cda5da8dSAndroid Build Coastguard Worker            raise ValueError("Semaphore initial value must be >= 0")
349*cda5da8dSAndroid Build Coastguard Worker        self._waiters = None
350*cda5da8dSAndroid Build Coastguard Worker        self._value = value
351*cda5da8dSAndroid Build Coastguard Worker
352*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
353*cda5da8dSAndroid Build Coastguard Worker        res = super().__repr__()
354*cda5da8dSAndroid Build Coastguard Worker        extra = 'locked' if self.locked() else f'unlocked, value:{self._value}'
355*cda5da8dSAndroid Build Coastguard Worker        if self._waiters:
356*cda5da8dSAndroid Build Coastguard Worker            extra = f'{extra}, waiters:{len(self._waiters)}'
357*cda5da8dSAndroid Build Coastguard Worker        return f'<{res[1:-1]} [{extra}]>'
358*cda5da8dSAndroid Build Coastguard Worker
359*cda5da8dSAndroid Build Coastguard Worker    def locked(self):
360*cda5da8dSAndroid Build Coastguard Worker        """Returns True if semaphore cannot be acquired immediately."""
361*cda5da8dSAndroid Build Coastguard Worker        return self._value == 0 or (
362*cda5da8dSAndroid Build Coastguard Worker            any(not w.cancelled() for w in (self._waiters or ())))
363*cda5da8dSAndroid Build Coastguard Worker
364*cda5da8dSAndroid Build Coastguard Worker    async def acquire(self):
365*cda5da8dSAndroid Build Coastguard Worker        """Acquire a semaphore.
366*cda5da8dSAndroid Build Coastguard Worker
367*cda5da8dSAndroid Build Coastguard Worker        If the internal counter is larger than zero on entry,
368*cda5da8dSAndroid Build Coastguard Worker        decrement it by one and return True immediately.  If it is
369*cda5da8dSAndroid Build Coastguard Worker        zero on entry, block, waiting until some other coroutine has
370*cda5da8dSAndroid Build Coastguard Worker        called release() to make it larger than 0, and then return
371*cda5da8dSAndroid Build Coastguard Worker        True.
372*cda5da8dSAndroid Build Coastguard Worker        """
373*cda5da8dSAndroid Build Coastguard Worker        if not self.locked():
374*cda5da8dSAndroid Build Coastguard Worker            self._value -= 1
375*cda5da8dSAndroid Build Coastguard Worker            return True
376*cda5da8dSAndroid Build Coastguard Worker
377*cda5da8dSAndroid Build Coastguard Worker        if self._waiters is None:
378*cda5da8dSAndroid Build Coastguard Worker            self._waiters = collections.deque()
379*cda5da8dSAndroid Build Coastguard Worker        fut = self._get_loop().create_future()
380*cda5da8dSAndroid Build Coastguard Worker        self._waiters.append(fut)
381*cda5da8dSAndroid Build Coastguard Worker
382*cda5da8dSAndroid Build Coastguard Worker        # Finally block should be called before the CancelledError
383*cda5da8dSAndroid Build Coastguard Worker        # handling as we don't want CancelledError to call
384*cda5da8dSAndroid Build Coastguard Worker        # _wake_up_first() and attempt to wake up itself.
385*cda5da8dSAndroid Build Coastguard Worker        try:
386*cda5da8dSAndroid Build Coastguard Worker            try:
387*cda5da8dSAndroid Build Coastguard Worker                await fut
388*cda5da8dSAndroid Build Coastguard Worker            finally:
389*cda5da8dSAndroid Build Coastguard Worker                self._waiters.remove(fut)
390*cda5da8dSAndroid Build Coastguard Worker        except exceptions.CancelledError:
391*cda5da8dSAndroid Build Coastguard Worker            if not fut.cancelled():
392*cda5da8dSAndroid Build Coastguard Worker                self._value += 1
393*cda5da8dSAndroid Build Coastguard Worker                self._wake_up_next()
394*cda5da8dSAndroid Build Coastguard Worker            raise
395*cda5da8dSAndroid Build Coastguard Worker
396*cda5da8dSAndroid Build Coastguard Worker        if self._value > 0:
397*cda5da8dSAndroid Build Coastguard Worker            self._wake_up_next()
398*cda5da8dSAndroid Build Coastguard Worker        return True
399*cda5da8dSAndroid Build Coastguard Worker
400*cda5da8dSAndroid Build Coastguard Worker    def release(self):
401*cda5da8dSAndroid Build Coastguard Worker        """Release a semaphore, incrementing the internal counter by one.
402*cda5da8dSAndroid Build Coastguard Worker
403*cda5da8dSAndroid Build Coastguard Worker        When it was zero on entry and another coroutine is waiting for it to
404*cda5da8dSAndroid Build Coastguard Worker        become larger than zero again, wake up that coroutine.
405*cda5da8dSAndroid Build Coastguard Worker        """
406*cda5da8dSAndroid Build Coastguard Worker        self._value += 1
407*cda5da8dSAndroid Build Coastguard Worker        self._wake_up_next()
408*cda5da8dSAndroid Build Coastguard Worker
409*cda5da8dSAndroid Build Coastguard Worker    def _wake_up_next(self):
410*cda5da8dSAndroid Build Coastguard Worker        """Wake up the first waiter that isn't done."""
411*cda5da8dSAndroid Build Coastguard Worker        if not self._waiters:
412*cda5da8dSAndroid Build Coastguard Worker            return
413*cda5da8dSAndroid Build Coastguard Worker
414*cda5da8dSAndroid Build Coastguard Worker        for fut in self._waiters:
415*cda5da8dSAndroid Build Coastguard Worker            if not fut.done():
416*cda5da8dSAndroid Build Coastguard Worker                self._value -= 1
417*cda5da8dSAndroid Build Coastguard Worker                fut.set_result(True)
418*cda5da8dSAndroid Build Coastguard Worker                return
419*cda5da8dSAndroid Build Coastguard Worker
420*cda5da8dSAndroid Build Coastguard Worker
421*cda5da8dSAndroid Build Coastguard Workerclass BoundedSemaphore(Semaphore):
422*cda5da8dSAndroid Build Coastguard Worker    """A bounded semaphore implementation.
423*cda5da8dSAndroid Build Coastguard Worker
424*cda5da8dSAndroid Build Coastguard Worker    This raises ValueError in release() if it would increase the value
425*cda5da8dSAndroid Build Coastguard Worker    above the initial value.
426*cda5da8dSAndroid Build Coastguard Worker    """
427*cda5da8dSAndroid Build Coastguard Worker
428*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, value=1):
429*cda5da8dSAndroid Build Coastguard Worker        self._bound_value = value
430*cda5da8dSAndroid Build Coastguard Worker        super().__init__(value)
431*cda5da8dSAndroid Build Coastguard Worker
432*cda5da8dSAndroid Build Coastguard Worker    def release(self):
433*cda5da8dSAndroid Build Coastguard Worker        if self._value >= self._bound_value:
434*cda5da8dSAndroid Build Coastguard Worker            raise ValueError('BoundedSemaphore released too many times')
435*cda5da8dSAndroid Build Coastguard Worker        super().release()
436*cda5da8dSAndroid Build Coastguard Worker
437*cda5da8dSAndroid Build Coastguard Worker
438*cda5da8dSAndroid Build Coastguard Worker
439*cda5da8dSAndroid Build Coastguard Workerclass _BarrierState(enum.Enum):
440*cda5da8dSAndroid Build Coastguard Worker    FILLING = 'filling'
441*cda5da8dSAndroid Build Coastguard Worker    DRAINING = 'draining'
442*cda5da8dSAndroid Build Coastguard Worker    RESETTING = 'resetting'
443*cda5da8dSAndroid Build Coastguard Worker    BROKEN = 'broken'
444*cda5da8dSAndroid Build Coastguard Worker
445*cda5da8dSAndroid Build Coastguard Worker
446*cda5da8dSAndroid Build Coastguard Workerclass Barrier(mixins._LoopBoundMixin):
447*cda5da8dSAndroid Build Coastguard Worker    """Asyncio equivalent to threading.Barrier
448*cda5da8dSAndroid Build Coastguard Worker
449*cda5da8dSAndroid Build Coastguard Worker    Implements a Barrier primitive.
450*cda5da8dSAndroid Build Coastguard Worker    Useful for synchronizing a fixed number of tasks at known synchronization
451*cda5da8dSAndroid Build Coastguard Worker    points. Tasks block on 'wait()' and are simultaneously awoken once they
452*cda5da8dSAndroid Build Coastguard Worker    have all made their call.
453*cda5da8dSAndroid Build Coastguard Worker    """
454*cda5da8dSAndroid Build Coastguard Worker
455*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, parties):
456*cda5da8dSAndroid Build Coastguard Worker        """Create a barrier, initialised to 'parties' tasks."""
457*cda5da8dSAndroid Build Coastguard Worker        if parties < 1:
458*cda5da8dSAndroid Build Coastguard Worker            raise ValueError('parties must be > 0')
459*cda5da8dSAndroid Build Coastguard Worker
460*cda5da8dSAndroid Build Coastguard Worker        self._cond = Condition() # notify all tasks when state changes
461*cda5da8dSAndroid Build Coastguard Worker
462*cda5da8dSAndroid Build Coastguard Worker        self._parties = parties
463*cda5da8dSAndroid Build Coastguard Worker        self._state = _BarrierState.FILLING
464*cda5da8dSAndroid Build Coastguard Worker        self._count = 0       # count tasks in Barrier
465*cda5da8dSAndroid Build Coastguard Worker
466*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
467*cda5da8dSAndroid Build Coastguard Worker        res = super().__repr__()
468*cda5da8dSAndroid Build Coastguard Worker        extra = f'{self._state.value}'
469*cda5da8dSAndroid Build Coastguard Worker        if not self.broken:
470*cda5da8dSAndroid Build Coastguard Worker            extra += f', waiters:{self.n_waiting}/{self.parties}'
471*cda5da8dSAndroid Build Coastguard Worker        return f'<{res[1:-1]} [{extra}]>'
472*cda5da8dSAndroid Build Coastguard Worker
473*cda5da8dSAndroid Build Coastguard Worker    async def __aenter__(self):
474*cda5da8dSAndroid Build Coastguard Worker        # wait for the barrier reaches the parties number
475*cda5da8dSAndroid Build Coastguard Worker        # when start draining release and return index of waited task
476*cda5da8dSAndroid Build Coastguard Worker        return await self.wait()
477*cda5da8dSAndroid Build Coastguard Worker
478*cda5da8dSAndroid Build Coastguard Worker    async def __aexit__(self, *args):
479*cda5da8dSAndroid Build Coastguard Worker        pass
480*cda5da8dSAndroid Build Coastguard Worker
481*cda5da8dSAndroid Build Coastguard Worker    async def wait(self):
482*cda5da8dSAndroid Build Coastguard Worker        """Wait for the barrier.
483*cda5da8dSAndroid Build Coastguard Worker
484*cda5da8dSAndroid Build Coastguard Worker        When the specified number of tasks have started waiting, they are all
485*cda5da8dSAndroid Build Coastguard Worker        simultaneously awoken.
486*cda5da8dSAndroid Build Coastguard Worker        Returns an unique and individual index number from 0 to 'parties-1'.
487*cda5da8dSAndroid Build Coastguard Worker        """
488*cda5da8dSAndroid Build Coastguard Worker        async with self._cond:
489*cda5da8dSAndroid Build Coastguard Worker            await self._block() # Block while the barrier drains or resets.
490*cda5da8dSAndroid Build Coastguard Worker            try:
491*cda5da8dSAndroid Build Coastguard Worker                index = self._count
492*cda5da8dSAndroid Build Coastguard Worker                self._count += 1
493*cda5da8dSAndroid Build Coastguard Worker                if index + 1 == self._parties:
494*cda5da8dSAndroid Build Coastguard Worker                    # We release the barrier
495*cda5da8dSAndroid Build Coastguard Worker                    await self._release()
496*cda5da8dSAndroid Build Coastguard Worker                else:
497*cda5da8dSAndroid Build Coastguard Worker                    await self._wait()
498*cda5da8dSAndroid Build Coastguard Worker                return index
499*cda5da8dSAndroid Build Coastguard Worker            finally:
500*cda5da8dSAndroid Build Coastguard Worker                self._count -= 1
501*cda5da8dSAndroid Build Coastguard Worker                # Wake up any tasks waiting for barrier to drain.
502*cda5da8dSAndroid Build Coastguard Worker                self._exit()
503*cda5da8dSAndroid Build Coastguard Worker
504*cda5da8dSAndroid Build Coastguard Worker    async def _block(self):
505*cda5da8dSAndroid Build Coastguard Worker        # Block until the barrier is ready for us,
506*cda5da8dSAndroid Build Coastguard Worker        # or raise an exception if it is broken.
507*cda5da8dSAndroid Build Coastguard Worker        #
508*cda5da8dSAndroid Build Coastguard Worker        # It is draining or resetting, wait until done
509*cda5da8dSAndroid Build Coastguard Worker        # unless a CancelledError occurs
510*cda5da8dSAndroid Build Coastguard Worker        await self._cond.wait_for(
511*cda5da8dSAndroid Build Coastguard Worker            lambda: self._state not in (
512*cda5da8dSAndroid Build Coastguard Worker                _BarrierState.DRAINING, _BarrierState.RESETTING
513*cda5da8dSAndroid Build Coastguard Worker            )
514*cda5da8dSAndroid Build Coastguard Worker        )
515*cda5da8dSAndroid Build Coastguard Worker
516*cda5da8dSAndroid Build Coastguard Worker        # see if the barrier is in a broken state
517*cda5da8dSAndroid Build Coastguard Worker        if self._state is _BarrierState.BROKEN:
518*cda5da8dSAndroid Build Coastguard Worker            raise exceptions.BrokenBarrierError("Barrier aborted")
519*cda5da8dSAndroid Build Coastguard Worker
520*cda5da8dSAndroid Build Coastguard Worker    async def _release(self):
521*cda5da8dSAndroid Build Coastguard Worker        # Release the tasks waiting in the barrier.
522*cda5da8dSAndroid Build Coastguard Worker
523*cda5da8dSAndroid Build Coastguard Worker        # Enter draining state.
524*cda5da8dSAndroid Build Coastguard Worker        # Next waiting tasks will be blocked until the end of draining.
525*cda5da8dSAndroid Build Coastguard Worker        self._state = _BarrierState.DRAINING
526*cda5da8dSAndroid Build Coastguard Worker        self._cond.notify_all()
527*cda5da8dSAndroid Build Coastguard Worker
528*cda5da8dSAndroid Build Coastguard Worker    async def _wait(self):
529*cda5da8dSAndroid Build Coastguard Worker        # Wait in the barrier until we are released. Raise an exception
530*cda5da8dSAndroid Build Coastguard Worker        # if the barrier is reset or broken.
531*cda5da8dSAndroid Build Coastguard Worker
532*cda5da8dSAndroid Build Coastguard Worker        # wait for end of filling
533*cda5da8dSAndroid Build Coastguard Worker        # unless a CancelledError occurs
534*cda5da8dSAndroid Build Coastguard Worker        await self._cond.wait_for(lambda: self._state is not _BarrierState.FILLING)
535*cda5da8dSAndroid Build Coastguard Worker
536*cda5da8dSAndroid Build Coastguard Worker        if self._state in (_BarrierState.BROKEN, _BarrierState.RESETTING):
537*cda5da8dSAndroid Build Coastguard Worker            raise exceptions.BrokenBarrierError("Abort or reset of barrier")
538*cda5da8dSAndroid Build Coastguard Worker
539*cda5da8dSAndroid Build Coastguard Worker    def _exit(self):
540*cda5da8dSAndroid Build Coastguard Worker        # If we are the last tasks to exit the barrier, signal any tasks
541*cda5da8dSAndroid Build Coastguard Worker        # waiting for the barrier to drain.
542*cda5da8dSAndroid Build Coastguard Worker        if self._count == 0:
543*cda5da8dSAndroid Build Coastguard Worker            if self._state in (_BarrierState.RESETTING, _BarrierState.DRAINING):
544*cda5da8dSAndroid Build Coastguard Worker                self._state = _BarrierState.FILLING
545*cda5da8dSAndroid Build Coastguard Worker            self._cond.notify_all()
546*cda5da8dSAndroid Build Coastguard Worker
547*cda5da8dSAndroid Build Coastguard Worker    async def reset(self):
548*cda5da8dSAndroid Build Coastguard Worker        """Reset the barrier to the initial state.
549*cda5da8dSAndroid Build Coastguard Worker
550*cda5da8dSAndroid Build Coastguard Worker        Any tasks currently waiting will get the BrokenBarrier exception
551*cda5da8dSAndroid Build Coastguard Worker        raised.
552*cda5da8dSAndroid Build Coastguard Worker        """
553*cda5da8dSAndroid Build Coastguard Worker        async with self._cond:
554*cda5da8dSAndroid Build Coastguard Worker            if self._count > 0:
555*cda5da8dSAndroid Build Coastguard Worker                if self._state is not _BarrierState.RESETTING:
556*cda5da8dSAndroid Build Coastguard Worker                    #reset the barrier, waking up tasks
557*cda5da8dSAndroid Build Coastguard Worker                    self._state = _BarrierState.RESETTING
558*cda5da8dSAndroid Build Coastguard Worker            else:
559*cda5da8dSAndroid Build Coastguard Worker                self._state = _BarrierState.FILLING
560*cda5da8dSAndroid Build Coastguard Worker            self._cond.notify_all()
561*cda5da8dSAndroid Build Coastguard Worker
562*cda5da8dSAndroid Build Coastguard Worker    async def abort(self):
563*cda5da8dSAndroid Build Coastguard Worker        """Place the barrier into a 'broken' state.
564*cda5da8dSAndroid Build Coastguard Worker
565*cda5da8dSAndroid Build Coastguard Worker        Useful in case of error.  Any currently waiting tasks and tasks
566*cda5da8dSAndroid Build Coastguard Worker        attempting to 'wait()' will have BrokenBarrierError raised.
567*cda5da8dSAndroid Build Coastguard Worker        """
568*cda5da8dSAndroid Build Coastguard Worker        async with self._cond:
569*cda5da8dSAndroid Build Coastguard Worker            self._state = _BarrierState.BROKEN
570*cda5da8dSAndroid Build Coastguard Worker            self._cond.notify_all()
571*cda5da8dSAndroid Build Coastguard Worker
572*cda5da8dSAndroid Build Coastguard Worker    @property
573*cda5da8dSAndroid Build Coastguard Worker    def parties(self):
574*cda5da8dSAndroid Build Coastguard Worker        """Return the number of tasks required to trip the barrier."""
575*cda5da8dSAndroid Build Coastguard Worker        return self._parties
576*cda5da8dSAndroid Build Coastguard Worker
577*cda5da8dSAndroid Build Coastguard Worker    @property
578*cda5da8dSAndroid Build Coastguard Worker    def n_waiting(self):
579*cda5da8dSAndroid Build Coastguard Worker        """Return the number of tasks currently waiting at the barrier."""
580*cda5da8dSAndroid Build Coastguard Worker        if self._state is _BarrierState.FILLING:
581*cda5da8dSAndroid Build Coastguard Worker            return self._count
582*cda5da8dSAndroid Build Coastguard Worker        return 0
583*cda5da8dSAndroid Build Coastguard Worker
584*cda5da8dSAndroid Build Coastguard Worker    @property
585*cda5da8dSAndroid Build Coastguard Worker    def broken(self):
586*cda5da8dSAndroid Build Coastguard Worker        """Return True if the barrier is in a broken state."""
587*cda5da8dSAndroid Build Coastguard Worker        return self._state is _BarrierState.BROKEN
588