1*cda5da8dSAndroid Build Coastguard Worker__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') 2*cda5da8dSAndroid Build Coastguard Worker 3*cda5da8dSAndroid Build Coastguard Workerimport collections 4*cda5da8dSAndroid Build Coastguard Workerimport heapq 5*cda5da8dSAndroid Build Coastguard Workerfrom types import GenericAlias 6*cda5da8dSAndroid Build Coastguard Worker 7*cda5da8dSAndroid Build Coastguard Workerfrom . import locks 8*cda5da8dSAndroid Build Coastguard Workerfrom . import mixins 9*cda5da8dSAndroid Build Coastguard Worker 10*cda5da8dSAndroid Build Coastguard Worker 11*cda5da8dSAndroid Build Coastguard Workerclass QueueEmpty(Exception): 12*cda5da8dSAndroid Build Coastguard Worker """Raised when Queue.get_nowait() is called on an empty Queue.""" 13*cda5da8dSAndroid Build Coastguard Worker pass 14*cda5da8dSAndroid Build Coastguard Worker 15*cda5da8dSAndroid Build Coastguard Worker 16*cda5da8dSAndroid Build Coastguard Workerclass QueueFull(Exception): 17*cda5da8dSAndroid Build Coastguard Worker """Raised when the Queue.put_nowait() method is called on a full Queue.""" 18*cda5da8dSAndroid Build Coastguard Worker pass 19*cda5da8dSAndroid Build Coastguard Worker 20*cda5da8dSAndroid Build Coastguard Worker 21*cda5da8dSAndroid Build Coastguard Workerclass Queue(mixins._LoopBoundMixin): 22*cda5da8dSAndroid Build Coastguard Worker """A queue, useful for coordinating producer and consumer coroutines. 23*cda5da8dSAndroid Build Coastguard Worker 24*cda5da8dSAndroid Build Coastguard Worker If maxsize is less than or equal to zero, the queue size is infinite. If it 25*cda5da8dSAndroid Build Coastguard Worker is an integer greater than 0, then "await put()" will block when the 26*cda5da8dSAndroid Build Coastguard Worker queue reaches maxsize, until an item is removed by get(). 27*cda5da8dSAndroid Build Coastguard Worker 28*cda5da8dSAndroid Build Coastguard Worker Unlike the standard library Queue, you can reliably know this Queue's size 29*cda5da8dSAndroid Build Coastguard Worker with qsize(), since your single-threaded asyncio application won't be 30*cda5da8dSAndroid Build Coastguard Worker interrupted between calling qsize() and doing an operation on the Queue. 31*cda5da8dSAndroid Build Coastguard Worker """ 32*cda5da8dSAndroid Build Coastguard Worker 33*cda5da8dSAndroid Build Coastguard Worker def __init__(self, maxsize=0): 34*cda5da8dSAndroid Build Coastguard Worker self._maxsize = maxsize 35*cda5da8dSAndroid Build Coastguard Worker 36*cda5da8dSAndroid Build Coastguard Worker # Futures. 37*cda5da8dSAndroid Build Coastguard Worker self._getters = collections.deque() 38*cda5da8dSAndroid Build Coastguard Worker # Futures. 39*cda5da8dSAndroid Build Coastguard Worker self._putters = collections.deque() 40*cda5da8dSAndroid Build Coastguard Worker self._unfinished_tasks = 0 41*cda5da8dSAndroid Build Coastguard Worker self._finished = locks.Event() 42*cda5da8dSAndroid Build Coastguard Worker self._finished.set() 43*cda5da8dSAndroid Build Coastguard Worker self._init(maxsize) 44*cda5da8dSAndroid Build Coastguard Worker 45*cda5da8dSAndroid Build Coastguard Worker # These three are overridable in subclasses. 46*cda5da8dSAndroid Build Coastguard Worker 47*cda5da8dSAndroid Build Coastguard Worker def _init(self, maxsize): 48*cda5da8dSAndroid Build Coastguard Worker self._queue = collections.deque() 49*cda5da8dSAndroid Build Coastguard Worker 50*cda5da8dSAndroid Build Coastguard Worker def _get(self): 51*cda5da8dSAndroid Build Coastguard Worker return self._queue.popleft() 52*cda5da8dSAndroid Build Coastguard Worker 53*cda5da8dSAndroid Build Coastguard Worker def _put(self, item): 54*cda5da8dSAndroid Build Coastguard Worker self._queue.append(item) 55*cda5da8dSAndroid Build Coastguard Worker 56*cda5da8dSAndroid Build Coastguard Worker # End of the overridable methods. 57*cda5da8dSAndroid Build Coastguard Worker 58*cda5da8dSAndroid Build Coastguard Worker def _wakeup_next(self, waiters): 59*cda5da8dSAndroid Build Coastguard Worker # Wake up the next waiter (if any) that isn't cancelled. 60*cda5da8dSAndroid Build Coastguard Worker while waiters: 61*cda5da8dSAndroid Build Coastguard Worker waiter = waiters.popleft() 62*cda5da8dSAndroid Build Coastguard Worker if not waiter.done(): 63*cda5da8dSAndroid Build Coastguard Worker waiter.set_result(None) 64*cda5da8dSAndroid Build Coastguard Worker break 65*cda5da8dSAndroid Build Coastguard Worker 66*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 67*cda5da8dSAndroid Build Coastguard Worker return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' 68*cda5da8dSAndroid Build Coastguard Worker 69*cda5da8dSAndroid Build Coastguard Worker def __str__(self): 70*cda5da8dSAndroid Build Coastguard Worker return f'<{type(self).__name__} {self._format()}>' 71*cda5da8dSAndroid Build Coastguard Worker 72*cda5da8dSAndroid Build Coastguard Worker __class_getitem__ = classmethod(GenericAlias) 73*cda5da8dSAndroid Build Coastguard Worker 74*cda5da8dSAndroid Build Coastguard Worker def _format(self): 75*cda5da8dSAndroid Build Coastguard Worker result = f'maxsize={self._maxsize!r}' 76*cda5da8dSAndroid Build Coastguard Worker if getattr(self, '_queue', None): 77*cda5da8dSAndroid Build Coastguard Worker result += f' _queue={list(self._queue)!r}' 78*cda5da8dSAndroid Build Coastguard Worker if self._getters: 79*cda5da8dSAndroid Build Coastguard Worker result += f' _getters[{len(self._getters)}]' 80*cda5da8dSAndroid Build Coastguard Worker if self._putters: 81*cda5da8dSAndroid Build Coastguard Worker result += f' _putters[{len(self._putters)}]' 82*cda5da8dSAndroid Build Coastguard Worker if self._unfinished_tasks: 83*cda5da8dSAndroid Build Coastguard Worker result += f' tasks={self._unfinished_tasks}' 84*cda5da8dSAndroid Build Coastguard Worker return result 85*cda5da8dSAndroid Build Coastguard Worker 86*cda5da8dSAndroid Build Coastguard Worker def qsize(self): 87*cda5da8dSAndroid Build Coastguard Worker """Number of items in the queue.""" 88*cda5da8dSAndroid Build Coastguard Worker return len(self._queue) 89*cda5da8dSAndroid Build Coastguard Worker 90*cda5da8dSAndroid Build Coastguard Worker @property 91*cda5da8dSAndroid Build Coastguard Worker def maxsize(self): 92*cda5da8dSAndroid Build Coastguard Worker """Number of items allowed in the queue.""" 93*cda5da8dSAndroid Build Coastguard Worker return self._maxsize 94*cda5da8dSAndroid Build Coastguard Worker 95*cda5da8dSAndroid Build Coastguard Worker def empty(self): 96*cda5da8dSAndroid Build Coastguard Worker """Return True if the queue is empty, False otherwise.""" 97*cda5da8dSAndroid Build Coastguard Worker return not self._queue 98*cda5da8dSAndroid Build Coastguard Worker 99*cda5da8dSAndroid Build Coastguard Worker def full(self): 100*cda5da8dSAndroid Build Coastguard Worker """Return True if there are maxsize items in the queue. 101*cda5da8dSAndroid Build Coastguard Worker 102*cda5da8dSAndroid Build Coastguard Worker Note: if the Queue was initialized with maxsize=0 (the default), 103*cda5da8dSAndroid Build Coastguard Worker then full() is never True. 104*cda5da8dSAndroid Build Coastguard Worker """ 105*cda5da8dSAndroid Build Coastguard Worker if self._maxsize <= 0: 106*cda5da8dSAndroid Build Coastguard Worker return False 107*cda5da8dSAndroid Build Coastguard Worker else: 108*cda5da8dSAndroid Build Coastguard Worker return self.qsize() >= self._maxsize 109*cda5da8dSAndroid Build Coastguard Worker 110*cda5da8dSAndroid Build Coastguard Worker async def put(self, item): 111*cda5da8dSAndroid Build Coastguard Worker """Put an item into the queue. 112*cda5da8dSAndroid Build Coastguard Worker 113*cda5da8dSAndroid Build Coastguard Worker Put an item into the queue. If the queue is full, wait until a free 114*cda5da8dSAndroid Build Coastguard Worker slot is available before adding item. 115*cda5da8dSAndroid Build Coastguard Worker """ 116*cda5da8dSAndroid Build Coastguard Worker while self.full(): 117*cda5da8dSAndroid Build Coastguard Worker putter = self._get_loop().create_future() 118*cda5da8dSAndroid Build Coastguard Worker self._putters.append(putter) 119*cda5da8dSAndroid Build Coastguard Worker try: 120*cda5da8dSAndroid Build Coastguard Worker await putter 121*cda5da8dSAndroid Build Coastguard Worker except: 122*cda5da8dSAndroid Build Coastguard Worker putter.cancel() # Just in case putter is not done yet. 123*cda5da8dSAndroid Build Coastguard Worker try: 124*cda5da8dSAndroid Build Coastguard Worker # Clean self._putters from canceled putters. 125*cda5da8dSAndroid Build Coastguard Worker self._putters.remove(putter) 126*cda5da8dSAndroid Build Coastguard Worker except ValueError: 127*cda5da8dSAndroid Build Coastguard Worker # The putter could be removed from self._putters by a 128*cda5da8dSAndroid Build Coastguard Worker # previous get_nowait call. 129*cda5da8dSAndroid Build Coastguard Worker pass 130*cda5da8dSAndroid Build Coastguard Worker if not self.full() and not putter.cancelled(): 131*cda5da8dSAndroid Build Coastguard Worker # We were woken up by get_nowait(), but can't take 132*cda5da8dSAndroid Build Coastguard Worker # the call. Wake up the next in line. 133*cda5da8dSAndroid Build Coastguard Worker self._wakeup_next(self._putters) 134*cda5da8dSAndroid Build Coastguard Worker raise 135*cda5da8dSAndroid Build Coastguard Worker return self.put_nowait(item) 136*cda5da8dSAndroid Build Coastguard Worker 137*cda5da8dSAndroid Build Coastguard Worker def put_nowait(self, item): 138*cda5da8dSAndroid Build Coastguard Worker """Put an item into the queue without blocking. 139*cda5da8dSAndroid Build Coastguard Worker 140*cda5da8dSAndroid Build Coastguard Worker If no free slot is immediately available, raise QueueFull. 141*cda5da8dSAndroid Build Coastguard Worker """ 142*cda5da8dSAndroid Build Coastguard Worker if self.full(): 143*cda5da8dSAndroid Build Coastguard Worker raise QueueFull 144*cda5da8dSAndroid Build Coastguard Worker self._put(item) 145*cda5da8dSAndroid Build Coastguard Worker self._unfinished_tasks += 1 146*cda5da8dSAndroid Build Coastguard Worker self._finished.clear() 147*cda5da8dSAndroid Build Coastguard Worker self._wakeup_next(self._getters) 148*cda5da8dSAndroid Build Coastguard Worker 149*cda5da8dSAndroid Build Coastguard Worker async def get(self): 150*cda5da8dSAndroid Build Coastguard Worker """Remove and return an item from the queue. 151*cda5da8dSAndroid Build Coastguard Worker 152*cda5da8dSAndroid Build Coastguard Worker If queue is empty, wait until an item is available. 153*cda5da8dSAndroid Build Coastguard Worker """ 154*cda5da8dSAndroid Build Coastguard Worker while self.empty(): 155*cda5da8dSAndroid Build Coastguard Worker getter = self._get_loop().create_future() 156*cda5da8dSAndroid Build Coastguard Worker self._getters.append(getter) 157*cda5da8dSAndroid Build Coastguard Worker try: 158*cda5da8dSAndroid Build Coastguard Worker await getter 159*cda5da8dSAndroid Build Coastguard Worker except: 160*cda5da8dSAndroid Build Coastguard Worker getter.cancel() # Just in case getter is not done yet. 161*cda5da8dSAndroid Build Coastguard Worker try: 162*cda5da8dSAndroid Build Coastguard Worker # Clean self._getters from canceled getters. 163*cda5da8dSAndroid Build Coastguard Worker self._getters.remove(getter) 164*cda5da8dSAndroid Build Coastguard Worker except ValueError: 165*cda5da8dSAndroid Build Coastguard Worker # The getter could be removed from self._getters by a 166*cda5da8dSAndroid Build Coastguard Worker # previous put_nowait call. 167*cda5da8dSAndroid Build Coastguard Worker pass 168*cda5da8dSAndroid Build Coastguard Worker if not self.empty() and not getter.cancelled(): 169*cda5da8dSAndroid Build Coastguard Worker # We were woken up by put_nowait(), but can't take 170*cda5da8dSAndroid Build Coastguard Worker # the call. Wake up the next in line. 171*cda5da8dSAndroid Build Coastguard Worker self._wakeup_next(self._getters) 172*cda5da8dSAndroid Build Coastguard Worker raise 173*cda5da8dSAndroid Build Coastguard Worker return self.get_nowait() 174*cda5da8dSAndroid Build Coastguard Worker 175*cda5da8dSAndroid Build Coastguard Worker def get_nowait(self): 176*cda5da8dSAndroid Build Coastguard Worker """Remove and return an item from the queue. 177*cda5da8dSAndroid Build Coastguard Worker 178*cda5da8dSAndroid Build Coastguard Worker Return an item if one is immediately available, else raise QueueEmpty. 179*cda5da8dSAndroid Build Coastguard Worker """ 180*cda5da8dSAndroid Build Coastguard Worker if self.empty(): 181*cda5da8dSAndroid Build Coastguard Worker raise QueueEmpty 182*cda5da8dSAndroid Build Coastguard Worker item = self._get() 183*cda5da8dSAndroid Build Coastguard Worker self._wakeup_next(self._putters) 184*cda5da8dSAndroid Build Coastguard Worker return item 185*cda5da8dSAndroid Build Coastguard Worker 186*cda5da8dSAndroid Build Coastguard Worker def task_done(self): 187*cda5da8dSAndroid Build Coastguard Worker """Indicate that a formerly enqueued task is complete. 188*cda5da8dSAndroid Build Coastguard Worker 189*cda5da8dSAndroid Build Coastguard Worker Used by queue consumers. For each get() used to fetch a task, 190*cda5da8dSAndroid Build Coastguard Worker a subsequent call to task_done() tells the queue that the processing 191*cda5da8dSAndroid Build Coastguard Worker on the task is complete. 192*cda5da8dSAndroid Build Coastguard Worker 193*cda5da8dSAndroid Build Coastguard Worker If a join() is currently blocking, it will resume when all items have 194*cda5da8dSAndroid Build Coastguard Worker been processed (meaning that a task_done() call was received for every 195*cda5da8dSAndroid Build Coastguard Worker item that had been put() into the queue). 196*cda5da8dSAndroid Build Coastguard Worker 197*cda5da8dSAndroid Build Coastguard Worker Raises ValueError if called more times than there were items placed in 198*cda5da8dSAndroid Build Coastguard Worker the queue. 199*cda5da8dSAndroid Build Coastguard Worker """ 200*cda5da8dSAndroid Build Coastguard Worker if self._unfinished_tasks <= 0: 201*cda5da8dSAndroid Build Coastguard Worker raise ValueError('task_done() called too many times') 202*cda5da8dSAndroid Build Coastguard Worker self._unfinished_tasks -= 1 203*cda5da8dSAndroid Build Coastguard Worker if self._unfinished_tasks == 0: 204*cda5da8dSAndroid Build Coastguard Worker self._finished.set() 205*cda5da8dSAndroid Build Coastguard Worker 206*cda5da8dSAndroid Build Coastguard Worker async def join(self): 207*cda5da8dSAndroid Build Coastguard Worker """Block until all items in the queue have been gotten and processed. 208*cda5da8dSAndroid Build Coastguard Worker 209*cda5da8dSAndroid Build Coastguard Worker The count of unfinished tasks goes up whenever an item is added to the 210*cda5da8dSAndroid Build Coastguard Worker queue. The count goes down whenever a consumer calls task_done() to 211*cda5da8dSAndroid Build Coastguard Worker indicate that the item was retrieved and all work on it is complete. 212*cda5da8dSAndroid Build Coastguard Worker When the count of unfinished tasks drops to zero, join() unblocks. 213*cda5da8dSAndroid Build Coastguard Worker """ 214*cda5da8dSAndroid Build Coastguard Worker if self._unfinished_tasks > 0: 215*cda5da8dSAndroid Build Coastguard Worker await self._finished.wait() 216*cda5da8dSAndroid Build Coastguard Worker 217*cda5da8dSAndroid Build Coastguard Worker 218*cda5da8dSAndroid Build Coastguard Workerclass PriorityQueue(Queue): 219*cda5da8dSAndroid Build Coastguard Worker """A subclass of Queue; retrieves entries in priority order (lowest first). 220*cda5da8dSAndroid Build Coastguard Worker 221*cda5da8dSAndroid Build Coastguard Worker Entries are typically tuples of the form: (priority number, data). 222*cda5da8dSAndroid Build Coastguard Worker """ 223*cda5da8dSAndroid Build Coastguard Worker 224*cda5da8dSAndroid Build Coastguard Worker def _init(self, maxsize): 225*cda5da8dSAndroid Build Coastguard Worker self._queue = [] 226*cda5da8dSAndroid Build Coastguard Worker 227*cda5da8dSAndroid Build Coastguard Worker def _put(self, item, heappush=heapq.heappush): 228*cda5da8dSAndroid Build Coastguard Worker heappush(self._queue, item) 229*cda5da8dSAndroid Build Coastguard Worker 230*cda5da8dSAndroid Build Coastguard Worker def _get(self, heappop=heapq.heappop): 231*cda5da8dSAndroid Build Coastguard Worker return heappop(self._queue) 232*cda5da8dSAndroid Build Coastguard Worker 233*cda5da8dSAndroid Build Coastguard Worker 234*cda5da8dSAndroid Build Coastguard Workerclass LifoQueue(Queue): 235*cda5da8dSAndroid Build Coastguard Worker """A subclass of Queue that retrieves most recently added entries first.""" 236*cda5da8dSAndroid Build Coastguard Worker 237*cda5da8dSAndroid Build Coastguard Worker def _init(self, maxsize): 238*cda5da8dSAndroid Build Coastguard Worker self._queue = [] 239*cda5da8dSAndroid Build Coastguard Worker 240*cda5da8dSAndroid Build Coastguard Worker def _put(self, item): 241*cda5da8dSAndroid Build Coastguard Worker self._queue.append(item) 242*cda5da8dSAndroid Build Coastguard Worker 243*cda5da8dSAndroid Build Coastguard Worker def _get(self): 244*cda5da8dSAndroid Build Coastguard Worker return self._queue.pop() 245