1*cda5da8dSAndroid Build Coastguard Worker"""Synchronization primitives.""" 2*cda5da8dSAndroid Build Coastguard Worker 3*cda5da8dSAndroid Build Coastguard Worker__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 4*cda5da8dSAndroid Build Coastguard Worker 'BoundedSemaphore', 'Barrier') 5*cda5da8dSAndroid Build Coastguard Worker 6*cda5da8dSAndroid Build Coastguard Workerimport collections 7*cda5da8dSAndroid Build Coastguard Workerimport enum 8*cda5da8dSAndroid Build Coastguard Worker 9*cda5da8dSAndroid Build Coastguard Workerfrom . import exceptions 10*cda5da8dSAndroid Build Coastguard Workerfrom . import mixins 11*cda5da8dSAndroid Build Coastguard Workerfrom . import tasks 12*cda5da8dSAndroid Build Coastguard Worker 13*cda5da8dSAndroid Build Coastguard Workerclass _ContextManagerMixin: 14*cda5da8dSAndroid Build Coastguard Worker async def __aenter__(self): 15*cda5da8dSAndroid Build Coastguard Worker await self.acquire() 16*cda5da8dSAndroid Build Coastguard Worker # We have no use for the "as ..." clause in the with 17*cda5da8dSAndroid Build Coastguard Worker # statement for locks. 18*cda5da8dSAndroid Build Coastguard Worker return None 19*cda5da8dSAndroid Build Coastguard Worker 20*cda5da8dSAndroid Build Coastguard Worker async def __aexit__(self, exc_type, exc, tb): 21*cda5da8dSAndroid Build Coastguard Worker self.release() 22*cda5da8dSAndroid Build Coastguard Worker 23*cda5da8dSAndroid Build Coastguard Worker 24*cda5da8dSAndroid Build Coastguard Workerclass Lock(_ContextManagerMixin, mixins._LoopBoundMixin): 25*cda5da8dSAndroid Build Coastguard Worker """Primitive lock objects. 26*cda5da8dSAndroid Build Coastguard Worker 27*cda5da8dSAndroid Build Coastguard Worker A primitive lock is a synchronization primitive that is not owned 28*cda5da8dSAndroid Build Coastguard Worker by a particular coroutine when locked. A primitive lock is in one 29*cda5da8dSAndroid Build Coastguard Worker of two states, 'locked' or 'unlocked'. 30*cda5da8dSAndroid Build Coastguard Worker 31*cda5da8dSAndroid Build Coastguard Worker It is created in the unlocked state. It has two basic methods, 32*cda5da8dSAndroid Build Coastguard Worker acquire() and release(). When the state is unlocked, acquire() 33*cda5da8dSAndroid Build Coastguard Worker changes the state to locked and returns immediately. When the 34*cda5da8dSAndroid Build Coastguard Worker state is locked, acquire() blocks until a call to release() in 35*cda5da8dSAndroid Build Coastguard Worker another coroutine changes it to unlocked, then the acquire() call 36*cda5da8dSAndroid Build Coastguard Worker resets it to locked and returns. The release() method should only 37*cda5da8dSAndroid Build Coastguard Worker be called in the locked state; it changes the state to unlocked 38*cda5da8dSAndroid Build Coastguard Worker and returns immediately. If an attempt is made to release an 39*cda5da8dSAndroid Build Coastguard Worker unlocked lock, a RuntimeError will be raised. 40*cda5da8dSAndroid Build Coastguard Worker 41*cda5da8dSAndroid Build Coastguard Worker When more than one coroutine is blocked in acquire() waiting for 42*cda5da8dSAndroid Build Coastguard Worker the state to turn to unlocked, only one coroutine proceeds when a 43*cda5da8dSAndroid Build Coastguard Worker release() call resets the state to unlocked; first coroutine which 44*cda5da8dSAndroid Build Coastguard Worker is blocked in acquire() is being processed. 45*cda5da8dSAndroid Build Coastguard Worker 46*cda5da8dSAndroid Build Coastguard Worker acquire() is a coroutine and should be called with 'await'. 47*cda5da8dSAndroid Build Coastguard Worker 48*cda5da8dSAndroid Build Coastguard Worker Locks also support the asynchronous context management protocol. 49*cda5da8dSAndroid Build Coastguard Worker 'async with lock' statement should be used. 50*cda5da8dSAndroid Build Coastguard Worker 51*cda5da8dSAndroid Build Coastguard Worker Usage: 52*cda5da8dSAndroid Build Coastguard Worker 53*cda5da8dSAndroid Build Coastguard Worker lock = Lock() 54*cda5da8dSAndroid Build Coastguard Worker ... 55*cda5da8dSAndroid Build Coastguard Worker await lock.acquire() 56*cda5da8dSAndroid Build Coastguard Worker try: 57*cda5da8dSAndroid Build Coastguard Worker ... 58*cda5da8dSAndroid Build Coastguard Worker finally: 59*cda5da8dSAndroid Build Coastguard Worker lock.release() 60*cda5da8dSAndroid Build Coastguard Worker 61*cda5da8dSAndroid Build Coastguard Worker Context manager usage: 62*cda5da8dSAndroid Build Coastguard Worker 63*cda5da8dSAndroid Build Coastguard Worker lock = Lock() 64*cda5da8dSAndroid Build Coastguard Worker ... 65*cda5da8dSAndroid Build Coastguard Worker async with lock: 66*cda5da8dSAndroid Build Coastguard Worker ... 67*cda5da8dSAndroid Build Coastguard Worker 68*cda5da8dSAndroid Build Coastguard Worker Lock objects can be tested for locking state: 69*cda5da8dSAndroid Build Coastguard Worker 70*cda5da8dSAndroid Build Coastguard Worker if not lock.locked(): 71*cda5da8dSAndroid Build Coastguard Worker await lock.acquire() 72*cda5da8dSAndroid Build Coastguard Worker else: 73*cda5da8dSAndroid Build Coastguard Worker # lock is acquired 74*cda5da8dSAndroid Build Coastguard Worker ... 75*cda5da8dSAndroid Build Coastguard Worker 76*cda5da8dSAndroid Build Coastguard Worker """ 77*cda5da8dSAndroid Build Coastguard Worker 78*cda5da8dSAndroid Build Coastguard Worker def __init__(self): 79*cda5da8dSAndroid Build Coastguard Worker self._waiters = None 80*cda5da8dSAndroid Build Coastguard Worker self._locked = False 81*cda5da8dSAndroid Build Coastguard Worker 82*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 83*cda5da8dSAndroid Build Coastguard Worker res = super().__repr__() 84*cda5da8dSAndroid Build Coastguard Worker extra = 'locked' if self._locked else 'unlocked' 85*cda5da8dSAndroid Build Coastguard Worker if self._waiters: 86*cda5da8dSAndroid Build Coastguard Worker extra = f'{extra}, waiters:{len(self._waiters)}' 87*cda5da8dSAndroid Build Coastguard Worker return f'<{res[1:-1]} [{extra}]>' 88*cda5da8dSAndroid Build Coastguard Worker 89*cda5da8dSAndroid Build Coastguard Worker def locked(self): 90*cda5da8dSAndroid Build Coastguard Worker """Return True if lock is acquired.""" 91*cda5da8dSAndroid Build Coastguard Worker return self._locked 92*cda5da8dSAndroid Build Coastguard Worker 93*cda5da8dSAndroid Build Coastguard Worker async def acquire(self): 94*cda5da8dSAndroid Build Coastguard Worker """Acquire a lock. 95*cda5da8dSAndroid Build Coastguard Worker 96*cda5da8dSAndroid Build Coastguard Worker This method blocks until the lock is unlocked, then sets it to 97*cda5da8dSAndroid Build Coastguard Worker locked and returns True. 98*cda5da8dSAndroid Build Coastguard Worker """ 99*cda5da8dSAndroid Build Coastguard Worker if (not self._locked and (self._waiters is None or 100*cda5da8dSAndroid Build Coastguard Worker all(w.cancelled() for w in self._waiters))): 101*cda5da8dSAndroid Build Coastguard Worker self._locked = True 102*cda5da8dSAndroid Build Coastguard Worker return True 103*cda5da8dSAndroid Build Coastguard Worker 104*cda5da8dSAndroid Build Coastguard Worker if self._waiters is None: 105*cda5da8dSAndroid Build Coastguard Worker self._waiters = collections.deque() 106*cda5da8dSAndroid Build Coastguard Worker fut = self._get_loop().create_future() 107*cda5da8dSAndroid Build Coastguard Worker self._waiters.append(fut) 108*cda5da8dSAndroid Build Coastguard Worker 109*cda5da8dSAndroid Build Coastguard Worker # Finally block should be called before the CancelledError 110*cda5da8dSAndroid Build Coastguard Worker # handling as we don't want CancelledError to call 111*cda5da8dSAndroid Build Coastguard Worker # _wake_up_first() and attempt to wake up itself. 112*cda5da8dSAndroid Build Coastguard Worker try: 113*cda5da8dSAndroid Build Coastguard Worker try: 114*cda5da8dSAndroid Build Coastguard Worker await fut 115*cda5da8dSAndroid Build Coastguard Worker finally: 116*cda5da8dSAndroid Build Coastguard Worker self._waiters.remove(fut) 117*cda5da8dSAndroid Build Coastguard Worker except exceptions.CancelledError: 118*cda5da8dSAndroid Build Coastguard Worker if not self._locked: 119*cda5da8dSAndroid Build Coastguard Worker self._wake_up_first() 120*cda5da8dSAndroid Build Coastguard Worker raise 121*cda5da8dSAndroid Build Coastguard Worker 122*cda5da8dSAndroid Build Coastguard Worker self._locked = True 123*cda5da8dSAndroid Build Coastguard Worker return True 124*cda5da8dSAndroid Build Coastguard Worker 125*cda5da8dSAndroid Build Coastguard Worker def release(self): 126*cda5da8dSAndroid Build Coastguard Worker """Release a lock. 127*cda5da8dSAndroid Build Coastguard Worker 128*cda5da8dSAndroid Build Coastguard Worker When the lock is locked, reset it to unlocked, and return. 129*cda5da8dSAndroid Build Coastguard Worker If any other coroutines are blocked waiting for the lock to become 130*cda5da8dSAndroid Build Coastguard Worker unlocked, allow exactly one of them to proceed. 131*cda5da8dSAndroid Build Coastguard Worker 132*cda5da8dSAndroid Build Coastguard Worker When invoked on an unlocked lock, a RuntimeError is raised. 133*cda5da8dSAndroid Build Coastguard Worker 134*cda5da8dSAndroid Build Coastguard Worker There is no return value. 135*cda5da8dSAndroid Build Coastguard Worker """ 136*cda5da8dSAndroid Build Coastguard Worker if self._locked: 137*cda5da8dSAndroid Build Coastguard Worker self._locked = False 138*cda5da8dSAndroid Build Coastguard Worker self._wake_up_first() 139*cda5da8dSAndroid Build Coastguard Worker else: 140*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('Lock is not acquired.') 141*cda5da8dSAndroid Build Coastguard Worker 142*cda5da8dSAndroid Build Coastguard Worker def _wake_up_first(self): 143*cda5da8dSAndroid Build Coastguard Worker """Wake up the first waiter if it isn't done.""" 144*cda5da8dSAndroid Build Coastguard Worker if not self._waiters: 145*cda5da8dSAndroid Build Coastguard Worker return 146*cda5da8dSAndroid Build Coastguard Worker try: 147*cda5da8dSAndroid Build Coastguard Worker fut = next(iter(self._waiters)) 148*cda5da8dSAndroid Build Coastguard Worker except StopIteration: 149*cda5da8dSAndroid Build Coastguard Worker return 150*cda5da8dSAndroid Build Coastguard Worker 151*cda5da8dSAndroid Build Coastguard Worker # .done() necessarily means that a waiter will wake up later on and 152*cda5da8dSAndroid Build Coastguard Worker # either take the lock, or, if it was cancelled and lock wasn't 153*cda5da8dSAndroid Build Coastguard Worker # taken already, will hit this again and wake up a new waiter. 154*cda5da8dSAndroid Build Coastguard Worker if not fut.done(): 155*cda5da8dSAndroid Build Coastguard Worker fut.set_result(True) 156*cda5da8dSAndroid Build Coastguard Worker 157*cda5da8dSAndroid Build Coastguard Worker 158*cda5da8dSAndroid Build Coastguard Workerclass Event(mixins._LoopBoundMixin): 159*cda5da8dSAndroid Build Coastguard Worker """Asynchronous equivalent to threading.Event. 160*cda5da8dSAndroid Build Coastguard Worker 161*cda5da8dSAndroid Build Coastguard Worker Class implementing event objects. An event manages a flag that can be set 162*cda5da8dSAndroid Build Coastguard Worker to true with the set() method and reset to false with the clear() method. 163*cda5da8dSAndroid Build Coastguard Worker The wait() method blocks until the flag is true. The flag is initially 164*cda5da8dSAndroid Build Coastguard Worker false. 165*cda5da8dSAndroid Build Coastguard Worker """ 166*cda5da8dSAndroid Build Coastguard Worker 167*cda5da8dSAndroid Build Coastguard Worker def __init__(self): 168*cda5da8dSAndroid Build Coastguard Worker self._waiters = collections.deque() 169*cda5da8dSAndroid Build Coastguard Worker self._value = False 170*cda5da8dSAndroid Build Coastguard Worker 171*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 172*cda5da8dSAndroid Build Coastguard Worker res = super().__repr__() 173*cda5da8dSAndroid Build Coastguard Worker extra = 'set' if self._value else 'unset' 174*cda5da8dSAndroid Build Coastguard Worker if self._waiters: 175*cda5da8dSAndroid Build Coastguard Worker extra = f'{extra}, waiters:{len(self._waiters)}' 176*cda5da8dSAndroid Build Coastguard Worker return f'<{res[1:-1]} [{extra}]>' 177*cda5da8dSAndroid Build Coastguard Worker 178*cda5da8dSAndroid Build Coastguard Worker def is_set(self): 179*cda5da8dSAndroid Build Coastguard Worker """Return True if and only if the internal flag is true.""" 180*cda5da8dSAndroid Build Coastguard Worker return self._value 181*cda5da8dSAndroid Build Coastguard Worker 182*cda5da8dSAndroid Build Coastguard Worker def set(self): 183*cda5da8dSAndroid Build Coastguard Worker """Set the internal flag to true. All coroutines waiting for it to 184*cda5da8dSAndroid Build Coastguard Worker become true are awakened. Coroutine that call wait() once the flag is 185*cda5da8dSAndroid Build Coastguard Worker true will not block at all. 186*cda5da8dSAndroid Build Coastguard Worker """ 187*cda5da8dSAndroid Build Coastguard Worker if not self._value: 188*cda5da8dSAndroid Build Coastguard Worker self._value = True 189*cda5da8dSAndroid Build Coastguard Worker 190*cda5da8dSAndroid Build Coastguard Worker for fut in self._waiters: 191*cda5da8dSAndroid Build Coastguard Worker if not fut.done(): 192*cda5da8dSAndroid Build Coastguard Worker fut.set_result(True) 193*cda5da8dSAndroid Build Coastguard Worker 194*cda5da8dSAndroid Build Coastguard Worker def clear(self): 195*cda5da8dSAndroid Build Coastguard Worker """Reset the internal flag to false. Subsequently, coroutines calling 196*cda5da8dSAndroid Build Coastguard Worker wait() will block until set() is called to set the internal flag 197*cda5da8dSAndroid Build Coastguard Worker to true again.""" 198*cda5da8dSAndroid Build Coastguard Worker self._value = False 199*cda5da8dSAndroid Build Coastguard Worker 200*cda5da8dSAndroid Build Coastguard Worker async def wait(self): 201*cda5da8dSAndroid Build Coastguard Worker """Block until the internal flag is true. 202*cda5da8dSAndroid Build Coastguard Worker 203*cda5da8dSAndroid Build Coastguard Worker If the internal flag is true on entry, return True 204*cda5da8dSAndroid Build Coastguard Worker immediately. Otherwise, block until another coroutine calls 205*cda5da8dSAndroid Build Coastguard Worker set() to set the flag to true, then return True. 206*cda5da8dSAndroid Build Coastguard Worker """ 207*cda5da8dSAndroid Build Coastguard Worker if self._value: 208*cda5da8dSAndroid Build Coastguard Worker return True 209*cda5da8dSAndroid Build Coastguard Worker 210*cda5da8dSAndroid Build Coastguard Worker fut = self._get_loop().create_future() 211*cda5da8dSAndroid Build Coastguard Worker self._waiters.append(fut) 212*cda5da8dSAndroid Build Coastguard Worker try: 213*cda5da8dSAndroid Build Coastguard Worker await fut 214*cda5da8dSAndroid Build Coastguard Worker return True 215*cda5da8dSAndroid Build Coastguard Worker finally: 216*cda5da8dSAndroid Build Coastguard Worker self._waiters.remove(fut) 217*cda5da8dSAndroid Build Coastguard Worker 218*cda5da8dSAndroid Build Coastguard Worker 219*cda5da8dSAndroid Build Coastguard Workerclass Condition(_ContextManagerMixin, mixins._LoopBoundMixin): 220*cda5da8dSAndroid Build Coastguard Worker """Asynchronous equivalent to threading.Condition. 221*cda5da8dSAndroid Build Coastguard Worker 222*cda5da8dSAndroid Build Coastguard Worker This class implements condition variable objects. A condition variable 223*cda5da8dSAndroid Build Coastguard Worker allows one or more coroutines to wait until they are notified by another 224*cda5da8dSAndroid Build Coastguard Worker coroutine. 225*cda5da8dSAndroid Build Coastguard Worker 226*cda5da8dSAndroid Build Coastguard Worker A new Lock object is created and used as the underlying lock. 227*cda5da8dSAndroid Build Coastguard Worker """ 228*cda5da8dSAndroid Build Coastguard Worker 229*cda5da8dSAndroid Build Coastguard Worker def __init__(self, lock=None): 230*cda5da8dSAndroid Build Coastguard Worker if lock is None: 231*cda5da8dSAndroid Build Coastguard Worker lock = Lock() 232*cda5da8dSAndroid Build Coastguard Worker 233*cda5da8dSAndroid Build Coastguard Worker self._lock = lock 234*cda5da8dSAndroid Build Coastguard Worker # Export the lock's locked(), acquire() and release() methods. 235*cda5da8dSAndroid Build Coastguard Worker self.locked = lock.locked 236*cda5da8dSAndroid Build Coastguard Worker self.acquire = lock.acquire 237*cda5da8dSAndroid Build Coastguard Worker self.release = lock.release 238*cda5da8dSAndroid Build Coastguard Worker 239*cda5da8dSAndroid Build Coastguard Worker self._waiters = collections.deque() 240*cda5da8dSAndroid Build Coastguard Worker 241*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 242*cda5da8dSAndroid Build Coastguard Worker res = super().__repr__() 243*cda5da8dSAndroid Build Coastguard Worker extra = 'locked' if self.locked() else 'unlocked' 244*cda5da8dSAndroid Build Coastguard Worker if self._waiters: 245*cda5da8dSAndroid Build Coastguard Worker extra = f'{extra}, waiters:{len(self._waiters)}' 246*cda5da8dSAndroid Build Coastguard Worker return f'<{res[1:-1]} [{extra}]>' 247*cda5da8dSAndroid Build Coastguard Worker 248*cda5da8dSAndroid Build Coastguard Worker async def wait(self): 249*cda5da8dSAndroid Build Coastguard Worker """Wait until notified. 250*cda5da8dSAndroid Build Coastguard Worker 251*cda5da8dSAndroid Build Coastguard Worker If the calling coroutine has not acquired the lock when this 252*cda5da8dSAndroid Build Coastguard Worker method is called, a RuntimeError is raised. 253*cda5da8dSAndroid Build Coastguard Worker 254*cda5da8dSAndroid Build Coastguard Worker This method releases the underlying lock, and then blocks 255*cda5da8dSAndroid Build Coastguard Worker until it is awakened by a notify() or notify_all() call for 256*cda5da8dSAndroid Build Coastguard Worker the same condition variable in another coroutine. Once 257*cda5da8dSAndroid Build Coastguard Worker awakened, it re-acquires the lock and returns True. 258*cda5da8dSAndroid Build Coastguard Worker """ 259*cda5da8dSAndroid Build Coastguard Worker if not self.locked(): 260*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('cannot wait on un-acquired lock') 261*cda5da8dSAndroid Build Coastguard Worker 262*cda5da8dSAndroid Build Coastguard Worker self.release() 263*cda5da8dSAndroid Build Coastguard Worker try: 264*cda5da8dSAndroid Build Coastguard Worker fut = self._get_loop().create_future() 265*cda5da8dSAndroid Build Coastguard Worker self._waiters.append(fut) 266*cda5da8dSAndroid Build Coastguard Worker try: 267*cda5da8dSAndroid Build Coastguard Worker await fut 268*cda5da8dSAndroid Build Coastguard Worker return True 269*cda5da8dSAndroid Build Coastguard Worker finally: 270*cda5da8dSAndroid Build Coastguard Worker self._waiters.remove(fut) 271*cda5da8dSAndroid Build Coastguard Worker 272*cda5da8dSAndroid Build Coastguard Worker finally: 273*cda5da8dSAndroid Build Coastguard Worker # Must reacquire lock even if wait is cancelled 274*cda5da8dSAndroid Build Coastguard Worker cancelled = False 275*cda5da8dSAndroid Build Coastguard Worker while True: 276*cda5da8dSAndroid Build Coastguard Worker try: 277*cda5da8dSAndroid Build Coastguard Worker await self.acquire() 278*cda5da8dSAndroid Build Coastguard Worker break 279*cda5da8dSAndroid Build Coastguard Worker except exceptions.CancelledError: 280*cda5da8dSAndroid Build Coastguard Worker cancelled = True 281*cda5da8dSAndroid Build Coastguard Worker 282*cda5da8dSAndroid Build Coastguard Worker if cancelled: 283*cda5da8dSAndroid Build Coastguard Worker raise exceptions.CancelledError 284*cda5da8dSAndroid Build Coastguard Worker 285*cda5da8dSAndroid Build Coastguard Worker async def wait_for(self, predicate): 286*cda5da8dSAndroid Build Coastguard Worker """Wait until a predicate becomes true. 287*cda5da8dSAndroid Build Coastguard Worker 288*cda5da8dSAndroid Build Coastguard Worker The predicate should be a callable which result will be 289*cda5da8dSAndroid Build Coastguard Worker interpreted as a boolean value. The final predicate value is 290*cda5da8dSAndroid Build Coastguard Worker the return value. 291*cda5da8dSAndroid Build Coastguard Worker """ 292*cda5da8dSAndroid Build Coastguard Worker result = predicate() 293*cda5da8dSAndroid Build Coastguard Worker while not result: 294*cda5da8dSAndroid Build Coastguard Worker await self.wait() 295*cda5da8dSAndroid Build Coastguard Worker result = predicate() 296*cda5da8dSAndroid Build Coastguard Worker return result 297*cda5da8dSAndroid Build Coastguard Worker 298*cda5da8dSAndroid Build Coastguard Worker def notify(self, n=1): 299*cda5da8dSAndroid Build Coastguard Worker """By default, wake up one coroutine waiting on this condition, if any. 300*cda5da8dSAndroid Build Coastguard Worker If the calling coroutine has not acquired the lock when this method 301*cda5da8dSAndroid Build Coastguard Worker is called, a RuntimeError is raised. 302*cda5da8dSAndroid Build Coastguard Worker 303*cda5da8dSAndroid Build Coastguard Worker This method wakes up at most n of the coroutines waiting for the 304*cda5da8dSAndroid Build Coastguard Worker condition variable; it is a no-op if no coroutines are waiting. 305*cda5da8dSAndroid Build Coastguard Worker 306*cda5da8dSAndroid Build Coastguard Worker Note: an awakened coroutine does not actually return from its 307*cda5da8dSAndroid Build Coastguard Worker wait() call until it can reacquire the lock. Since notify() does 308*cda5da8dSAndroid Build Coastguard Worker not release the lock, its caller should. 309*cda5da8dSAndroid Build Coastguard Worker """ 310*cda5da8dSAndroid Build Coastguard Worker if not self.locked(): 311*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('cannot notify on un-acquired lock') 312*cda5da8dSAndroid Build Coastguard Worker 313*cda5da8dSAndroid Build Coastguard Worker idx = 0 314*cda5da8dSAndroid Build Coastguard Worker for fut in self._waiters: 315*cda5da8dSAndroid Build Coastguard Worker if idx >= n: 316*cda5da8dSAndroid Build Coastguard Worker break 317*cda5da8dSAndroid Build Coastguard Worker 318*cda5da8dSAndroid Build Coastguard Worker if not fut.done(): 319*cda5da8dSAndroid Build Coastguard Worker idx += 1 320*cda5da8dSAndroid Build Coastguard Worker fut.set_result(False) 321*cda5da8dSAndroid Build Coastguard Worker 322*cda5da8dSAndroid Build Coastguard Worker def notify_all(self): 323*cda5da8dSAndroid Build Coastguard Worker """Wake up all threads waiting on this condition. This method acts 324*cda5da8dSAndroid Build Coastguard Worker like notify(), but wakes up all waiting threads instead of one. If the 325*cda5da8dSAndroid Build Coastguard Worker calling thread has not acquired the lock when this method is called, 326*cda5da8dSAndroid Build Coastguard Worker a RuntimeError is raised. 327*cda5da8dSAndroid Build Coastguard Worker """ 328*cda5da8dSAndroid Build Coastguard Worker self.notify(len(self._waiters)) 329*cda5da8dSAndroid Build Coastguard Worker 330*cda5da8dSAndroid Build Coastguard Worker 331*cda5da8dSAndroid Build Coastguard Workerclass Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin): 332*cda5da8dSAndroid Build Coastguard Worker """A Semaphore implementation. 333*cda5da8dSAndroid Build Coastguard Worker 334*cda5da8dSAndroid Build Coastguard Worker A semaphore manages an internal counter which is decremented by each 335*cda5da8dSAndroid Build Coastguard Worker acquire() call and incremented by each release() call. The counter 336*cda5da8dSAndroid Build Coastguard Worker can never go below zero; when acquire() finds that it is zero, it blocks, 337*cda5da8dSAndroid Build Coastguard Worker waiting until some other thread calls release(). 338*cda5da8dSAndroid Build Coastguard Worker 339*cda5da8dSAndroid Build Coastguard Worker Semaphores also support the context management protocol. 340*cda5da8dSAndroid Build Coastguard Worker 341*cda5da8dSAndroid Build Coastguard Worker The optional argument gives the initial value for the internal 342*cda5da8dSAndroid Build Coastguard Worker counter; it defaults to 1. If the value given is less than 0, 343*cda5da8dSAndroid Build Coastguard Worker ValueError is raised. 344*cda5da8dSAndroid Build Coastguard Worker """ 345*cda5da8dSAndroid Build Coastguard Worker 346*cda5da8dSAndroid Build Coastguard Worker def __init__(self, value=1): 347*cda5da8dSAndroid Build Coastguard Worker if value < 0: 348*cda5da8dSAndroid Build Coastguard Worker raise ValueError("Semaphore initial value must be >= 0") 349*cda5da8dSAndroid Build Coastguard Worker self._waiters = None 350*cda5da8dSAndroid Build Coastguard Worker self._value = value 351*cda5da8dSAndroid Build Coastguard Worker 352*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 353*cda5da8dSAndroid Build Coastguard Worker res = super().__repr__() 354*cda5da8dSAndroid Build Coastguard Worker extra = 'locked' if self.locked() else f'unlocked, value:{self._value}' 355*cda5da8dSAndroid Build Coastguard Worker if self._waiters: 356*cda5da8dSAndroid Build Coastguard Worker extra = f'{extra}, waiters:{len(self._waiters)}' 357*cda5da8dSAndroid Build Coastguard Worker return f'<{res[1:-1]} [{extra}]>' 358*cda5da8dSAndroid Build Coastguard Worker 359*cda5da8dSAndroid Build Coastguard Worker def locked(self): 360*cda5da8dSAndroid Build Coastguard Worker """Returns True if semaphore cannot be acquired immediately.""" 361*cda5da8dSAndroid Build Coastguard Worker return self._value == 0 or ( 362*cda5da8dSAndroid Build Coastguard Worker any(not w.cancelled() for w in (self._waiters or ()))) 363*cda5da8dSAndroid Build Coastguard Worker 364*cda5da8dSAndroid Build Coastguard Worker async def acquire(self): 365*cda5da8dSAndroid Build Coastguard Worker """Acquire a semaphore. 366*cda5da8dSAndroid Build Coastguard Worker 367*cda5da8dSAndroid Build Coastguard Worker If the internal counter is larger than zero on entry, 368*cda5da8dSAndroid Build Coastguard Worker decrement it by one and return True immediately. If it is 369*cda5da8dSAndroid Build Coastguard Worker zero on entry, block, waiting until some other coroutine has 370*cda5da8dSAndroid Build Coastguard Worker called release() to make it larger than 0, and then return 371*cda5da8dSAndroid Build Coastguard Worker True. 372*cda5da8dSAndroid Build Coastguard Worker """ 373*cda5da8dSAndroid Build Coastguard Worker if not self.locked(): 374*cda5da8dSAndroid Build Coastguard Worker self._value -= 1 375*cda5da8dSAndroid Build Coastguard Worker return True 376*cda5da8dSAndroid Build Coastguard Worker 377*cda5da8dSAndroid Build Coastguard Worker if self._waiters is None: 378*cda5da8dSAndroid Build Coastguard Worker self._waiters = collections.deque() 379*cda5da8dSAndroid Build Coastguard Worker fut = self._get_loop().create_future() 380*cda5da8dSAndroid Build Coastguard Worker self._waiters.append(fut) 381*cda5da8dSAndroid Build Coastguard Worker 382*cda5da8dSAndroid Build Coastguard Worker # Finally block should be called before the CancelledError 383*cda5da8dSAndroid Build Coastguard Worker # handling as we don't want CancelledError to call 384*cda5da8dSAndroid Build Coastguard Worker # _wake_up_first() and attempt to wake up itself. 385*cda5da8dSAndroid Build Coastguard Worker try: 386*cda5da8dSAndroid Build Coastguard Worker try: 387*cda5da8dSAndroid Build Coastguard Worker await fut 388*cda5da8dSAndroid Build Coastguard Worker finally: 389*cda5da8dSAndroid Build Coastguard Worker self._waiters.remove(fut) 390*cda5da8dSAndroid Build Coastguard Worker except exceptions.CancelledError: 391*cda5da8dSAndroid Build Coastguard Worker if not fut.cancelled(): 392*cda5da8dSAndroid Build Coastguard Worker self._value += 1 393*cda5da8dSAndroid Build Coastguard Worker self._wake_up_next() 394*cda5da8dSAndroid Build Coastguard Worker raise 395*cda5da8dSAndroid Build Coastguard Worker 396*cda5da8dSAndroid Build Coastguard Worker if self._value > 0: 397*cda5da8dSAndroid Build Coastguard Worker self._wake_up_next() 398*cda5da8dSAndroid Build Coastguard Worker return True 399*cda5da8dSAndroid Build Coastguard Worker 400*cda5da8dSAndroid Build Coastguard Worker def release(self): 401*cda5da8dSAndroid Build Coastguard Worker """Release a semaphore, incrementing the internal counter by one. 402*cda5da8dSAndroid Build Coastguard Worker 403*cda5da8dSAndroid Build Coastguard Worker When it was zero on entry and another coroutine is waiting for it to 404*cda5da8dSAndroid Build Coastguard Worker become larger than zero again, wake up that coroutine. 405*cda5da8dSAndroid Build Coastguard Worker """ 406*cda5da8dSAndroid Build Coastguard Worker self._value += 1 407*cda5da8dSAndroid Build Coastguard Worker self._wake_up_next() 408*cda5da8dSAndroid Build Coastguard Worker 409*cda5da8dSAndroid Build Coastguard Worker def _wake_up_next(self): 410*cda5da8dSAndroid Build Coastguard Worker """Wake up the first waiter that isn't done.""" 411*cda5da8dSAndroid Build Coastguard Worker if not self._waiters: 412*cda5da8dSAndroid Build Coastguard Worker return 413*cda5da8dSAndroid Build Coastguard Worker 414*cda5da8dSAndroid Build Coastguard Worker for fut in self._waiters: 415*cda5da8dSAndroid Build Coastguard Worker if not fut.done(): 416*cda5da8dSAndroid Build Coastguard Worker self._value -= 1 417*cda5da8dSAndroid Build Coastguard Worker fut.set_result(True) 418*cda5da8dSAndroid Build Coastguard Worker return 419*cda5da8dSAndroid Build Coastguard Worker 420*cda5da8dSAndroid Build Coastguard Worker 421*cda5da8dSAndroid Build Coastguard Workerclass BoundedSemaphore(Semaphore): 422*cda5da8dSAndroid Build Coastguard Worker """A bounded semaphore implementation. 423*cda5da8dSAndroid Build Coastguard Worker 424*cda5da8dSAndroid Build Coastguard Worker This raises ValueError in release() if it would increase the value 425*cda5da8dSAndroid Build Coastguard Worker above the initial value. 426*cda5da8dSAndroid Build Coastguard Worker """ 427*cda5da8dSAndroid Build Coastguard Worker 428*cda5da8dSAndroid Build Coastguard Worker def __init__(self, value=1): 429*cda5da8dSAndroid Build Coastguard Worker self._bound_value = value 430*cda5da8dSAndroid Build Coastguard Worker super().__init__(value) 431*cda5da8dSAndroid Build Coastguard Worker 432*cda5da8dSAndroid Build Coastguard Worker def release(self): 433*cda5da8dSAndroid Build Coastguard Worker if self._value >= self._bound_value: 434*cda5da8dSAndroid Build Coastguard Worker raise ValueError('BoundedSemaphore released too many times') 435*cda5da8dSAndroid Build Coastguard Worker super().release() 436*cda5da8dSAndroid Build Coastguard Worker 437*cda5da8dSAndroid Build Coastguard Worker 438*cda5da8dSAndroid Build Coastguard Worker 439*cda5da8dSAndroid Build Coastguard Workerclass _BarrierState(enum.Enum): 440*cda5da8dSAndroid Build Coastguard Worker FILLING = 'filling' 441*cda5da8dSAndroid Build Coastguard Worker DRAINING = 'draining' 442*cda5da8dSAndroid Build Coastguard Worker RESETTING = 'resetting' 443*cda5da8dSAndroid Build Coastguard Worker BROKEN = 'broken' 444*cda5da8dSAndroid Build Coastguard Worker 445*cda5da8dSAndroid Build Coastguard Worker 446*cda5da8dSAndroid Build Coastguard Workerclass Barrier(mixins._LoopBoundMixin): 447*cda5da8dSAndroid Build Coastguard Worker """Asyncio equivalent to threading.Barrier 448*cda5da8dSAndroid Build Coastguard Worker 449*cda5da8dSAndroid Build Coastguard Worker Implements a Barrier primitive. 450*cda5da8dSAndroid Build Coastguard Worker Useful for synchronizing a fixed number of tasks at known synchronization 451*cda5da8dSAndroid Build Coastguard Worker points. Tasks block on 'wait()' and are simultaneously awoken once they 452*cda5da8dSAndroid Build Coastguard Worker have all made their call. 453*cda5da8dSAndroid Build Coastguard Worker """ 454*cda5da8dSAndroid Build Coastguard Worker 455*cda5da8dSAndroid Build Coastguard Worker def __init__(self, parties): 456*cda5da8dSAndroid Build Coastguard Worker """Create a barrier, initialised to 'parties' tasks.""" 457*cda5da8dSAndroid Build Coastguard Worker if parties < 1: 458*cda5da8dSAndroid Build Coastguard Worker raise ValueError('parties must be > 0') 459*cda5da8dSAndroid Build Coastguard Worker 460*cda5da8dSAndroid Build Coastguard Worker self._cond = Condition() # notify all tasks when state changes 461*cda5da8dSAndroid Build Coastguard Worker 462*cda5da8dSAndroid Build Coastguard Worker self._parties = parties 463*cda5da8dSAndroid Build Coastguard Worker self._state = _BarrierState.FILLING 464*cda5da8dSAndroid Build Coastguard Worker self._count = 0 # count tasks in Barrier 465*cda5da8dSAndroid Build Coastguard Worker 466*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 467*cda5da8dSAndroid Build Coastguard Worker res = super().__repr__() 468*cda5da8dSAndroid Build Coastguard Worker extra = f'{self._state.value}' 469*cda5da8dSAndroid Build Coastguard Worker if not self.broken: 470*cda5da8dSAndroid Build Coastguard Worker extra += f', waiters:{self.n_waiting}/{self.parties}' 471*cda5da8dSAndroid Build Coastguard Worker return f'<{res[1:-1]} [{extra}]>' 472*cda5da8dSAndroid Build Coastguard Worker 473*cda5da8dSAndroid Build Coastguard Worker async def __aenter__(self): 474*cda5da8dSAndroid Build Coastguard Worker # wait for the barrier reaches the parties number 475*cda5da8dSAndroid Build Coastguard Worker # when start draining release and return index of waited task 476*cda5da8dSAndroid Build Coastguard Worker return await self.wait() 477*cda5da8dSAndroid Build Coastguard Worker 478*cda5da8dSAndroid Build Coastguard Worker async def __aexit__(self, *args): 479*cda5da8dSAndroid Build Coastguard Worker pass 480*cda5da8dSAndroid Build Coastguard Worker 481*cda5da8dSAndroid Build Coastguard Worker async def wait(self): 482*cda5da8dSAndroid Build Coastguard Worker """Wait for the barrier. 483*cda5da8dSAndroid Build Coastguard Worker 484*cda5da8dSAndroid Build Coastguard Worker When the specified number of tasks have started waiting, they are all 485*cda5da8dSAndroid Build Coastguard Worker simultaneously awoken. 486*cda5da8dSAndroid Build Coastguard Worker Returns an unique and individual index number from 0 to 'parties-1'. 487*cda5da8dSAndroid Build Coastguard Worker """ 488*cda5da8dSAndroid Build Coastguard Worker async with self._cond: 489*cda5da8dSAndroid Build Coastguard Worker await self._block() # Block while the barrier drains or resets. 490*cda5da8dSAndroid Build Coastguard Worker try: 491*cda5da8dSAndroid Build Coastguard Worker index = self._count 492*cda5da8dSAndroid Build Coastguard Worker self._count += 1 493*cda5da8dSAndroid Build Coastguard Worker if index + 1 == self._parties: 494*cda5da8dSAndroid Build Coastguard Worker # We release the barrier 495*cda5da8dSAndroid Build Coastguard Worker await self._release() 496*cda5da8dSAndroid Build Coastguard Worker else: 497*cda5da8dSAndroid Build Coastguard Worker await self._wait() 498*cda5da8dSAndroid Build Coastguard Worker return index 499*cda5da8dSAndroid Build Coastguard Worker finally: 500*cda5da8dSAndroid Build Coastguard Worker self._count -= 1 501*cda5da8dSAndroid Build Coastguard Worker # Wake up any tasks waiting for barrier to drain. 502*cda5da8dSAndroid Build Coastguard Worker self._exit() 503*cda5da8dSAndroid Build Coastguard Worker 504*cda5da8dSAndroid Build Coastguard Worker async def _block(self): 505*cda5da8dSAndroid Build Coastguard Worker # Block until the barrier is ready for us, 506*cda5da8dSAndroid Build Coastguard Worker # or raise an exception if it is broken. 507*cda5da8dSAndroid Build Coastguard Worker # 508*cda5da8dSAndroid Build Coastguard Worker # It is draining or resetting, wait until done 509*cda5da8dSAndroid Build Coastguard Worker # unless a CancelledError occurs 510*cda5da8dSAndroid Build Coastguard Worker await self._cond.wait_for( 511*cda5da8dSAndroid Build Coastguard Worker lambda: self._state not in ( 512*cda5da8dSAndroid Build Coastguard Worker _BarrierState.DRAINING, _BarrierState.RESETTING 513*cda5da8dSAndroid Build Coastguard Worker ) 514*cda5da8dSAndroid Build Coastguard Worker ) 515*cda5da8dSAndroid Build Coastguard Worker 516*cda5da8dSAndroid Build Coastguard Worker # see if the barrier is in a broken state 517*cda5da8dSAndroid Build Coastguard Worker if self._state is _BarrierState.BROKEN: 518*cda5da8dSAndroid Build Coastguard Worker raise exceptions.BrokenBarrierError("Barrier aborted") 519*cda5da8dSAndroid Build Coastguard Worker 520*cda5da8dSAndroid Build Coastguard Worker async def _release(self): 521*cda5da8dSAndroid Build Coastguard Worker # Release the tasks waiting in the barrier. 522*cda5da8dSAndroid Build Coastguard Worker 523*cda5da8dSAndroid Build Coastguard Worker # Enter draining state. 524*cda5da8dSAndroid Build Coastguard Worker # Next waiting tasks will be blocked until the end of draining. 525*cda5da8dSAndroid Build Coastguard Worker self._state = _BarrierState.DRAINING 526*cda5da8dSAndroid Build Coastguard Worker self._cond.notify_all() 527*cda5da8dSAndroid Build Coastguard Worker 528*cda5da8dSAndroid Build Coastguard Worker async def _wait(self): 529*cda5da8dSAndroid Build Coastguard Worker # Wait in the barrier until we are released. Raise an exception 530*cda5da8dSAndroid Build Coastguard Worker # if the barrier is reset or broken. 531*cda5da8dSAndroid Build Coastguard Worker 532*cda5da8dSAndroid Build Coastguard Worker # wait for end of filling 533*cda5da8dSAndroid Build Coastguard Worker # unless a CancelledError occurs 534*cda5da8dSAndroid Build Coastguard Worker await self._cond.wait_for(lambda: self._state is not _BarrierState.FILLING) 535*cda5da8dSAndroid Build Coastguard Worker 536*cda5da8dSAndroid Build Coastguard Worker if self._state in (_BarrierState.BROKEN, _BarrierState.RESETTING): 537*cda5da8dSAndroid Build Coastguard Worker raise exceptions.BrokenBarrierError("Abort or reset of barrier") 538*cda5da8dSAndroid Build Coastguard Worker 539*cda5da8dSAndroid Build Coastguard Worker def _exit(self): 540*cda5da8dSAndroid Build Coastguard Worker # If we are the last tasks to exit the barrier, signal any tasks 541*cda5da8dSAndroid Build Coastguard Worker # waiting for the barrier to drain. 542*cda5da8dSAndroid Build Coastguard Worker if self._count == 0: 543*cda5da8dSAndroid Build Coastguard Worker if self._state in (_BarrierState.RESETTING, _BarrierState.DRAINING): 544*cda5da8dSAndroid Build Coastguard Worker self._state = _BarrierState.FILLING 545*cda5da8dSAndroid Build Coastguard Worker self._cond.notify_all() 546*cda5da8dSAndroid Build Coastguard Worker 547*cda5da8dSAndroid Build Coastguard Worker async def reset(self): 548*cda5da8dSAndroid Build Coastguard Worker """Reset the barrier to the initial state. 549*cda5da8dSAndroid Build Coastguard Worker 550*cda5da8dSAndroid Build Coastguard Worker Any tasks currently waiting will get the BrokenBarrier exception 551*cda5da8dSAndroid Build Coastguard Worker raised. 552*cda5da8dSAndroid Build Coastguard Worker """ 553*cda5da8dSAndroid Build Coastguard Worker async with self._cond: 554*cda5da8dSAndroid Build Coastguard Worker if self._count > 0: 555*cda5da8dSAndroid Build Coastguard Worker if self._state is not _BarrierState.RESETTING: 556*cda5da8dSAndroid Build Coastguard Worker #reset the barrier, waking up tasks 557*cda5da8dSAndroid Build Coastguard Worker self._state = _BarrierState.RESETTING 558*cda5da8dSAndroid Build Coastguard Worker else: 559*cda5da8dSAndroid Build Coastguard Worker self._state = _BarrierState.FILLING 560*cda5da8dSAndroid Build Coastguard Worker self._cond.notify_all() 561*cda5da8dSAndroid Build Coastguard Worker 562*cda5da8dSAndroid Build Coastguard Worker async def abort(self): 563*cda5da8dSAndroid Build Coastguard Worker """Place the barrier into a 'broken' state. 564*cda5da8dSAndroid Build Coastguard Worker 565*cda5da8dSAndroid Build Coastguard Worker Useful in case of error. Any currently waiting tasks and tasks 566*cda5da8dSAndroid Build Coastguard Worker attempting to 'wait()' will have BrokenBarrierError raised. 567*cda5da8dSAndroid Build Coastguard Worker """ 568*cda5da8dSAndroid Build Coastguard Worker async with self._cond: 569*cda5da8dSAndroid Build Coastguard Worker self._state = _BarrierState.BROKEN 570*cda5da8dSAndroid Build Coastguard Worker self._cond.notify_all() 571*cda5da8dSAndroid Build Coastguard Worker 572*cda5da8dSAndroid Build Coastguard Worker @property 573*cda5da8dSAndroid Build Coastguard Worker def parties(self): 574*cda5da8dSAndroid Build Coastguard Worker """Return the number of tasks required to trip the barrier.""" 575*cda5da8dSAndroid Build Coastguard Worker return self._parties 576*cda5da8dSAndroid Build Coastguard Worker 577*cda5da8dSAndroid Build Coastguard Worker @property 578*cda5da8dSAndroid Build Coastguard Worker def n_waiting(self): 579*cda5da8dSAndroid Build Coastguard Worker """Return the number of tasks currently waiting at the barrier.""" 580*cda5da8dSAndroid Build Coastguard Worker if self._state is _BarrierState.FILLING: 581*cda5da8dSAndroid Build Coastguard Worker return self._count 582*cda5da8dSAndroid Build Coastguard Worker return 0 583*cda5da8dSAndroid Build Coastguard Worker 584*cda5da8dSAndroid Build Coastguard Worker @property 585*cda5da8dSAndroid Build Coastguard Worker def broken(self): 586*cda5da8dSAndroid Build Coastguard Worker """Return True if the barrier is in a broken state.""" 587*cda5da8dSAndroid Build Coastguard Worker return self._state is _BarrierState.BROKEN 588