1__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') 2 3import collections 4import heapq 5from types import GenericAlias 6 7from . import locks 8from . import mixins 9 10 11class QueueEmpty(Exception): 12 """Raised when Queue.get_nowait() is called on an empty Queue.""" 13 pass 14 15 16class QueueFull(Exception): 17 """Raised when the Queue.put_nowait() method is called on a full Queue.""" 18 pass 19 20 21class Queue(mixins._LoopBoundMixin): 22 """A queue, useful for coordinating producer and consumer coroutines. 23 24 If maxsize is less than or equal to zero, the queue size is infinite. If it 25 is an integer greater than 0, then "await put()" will block when the 26 queue reaches maxsize, until an item is removed by get(). 27 28 Unlike the standard library Queue, you can reliably know this Queue's size 29 with qsize(), since your single-threaded asyncio application won't be 30 interrupted between calling qsize() and doing an operation on the Queue. 31 """ 32 33 def __init__(self, maxsize=0): 34 self._maxsize = maxsize 35 36 # Futures. 37 self._getters = collections.deque() 38 # Futures. 39 self._putters = collections.deque() 40 self._unfinished_tasks = 0 41 self._finished = locks.Event() 42 self._finished.set() 43 self._init(maxsize) 44 45 # These three are overridable in subclasses. 46 47 def _init(self, maxsize): 48 self._queue = collections.deque() 49 50 def _get(self): 51 return self._queue.popleft() 52 53 def _put(self, item): 54 self._queue.append(item) 55 56 # End of the overridable methods. 57 58 def _wakeup_next(self, waiters): 59 # Wake up the next waiter (if any) that isn't cancelled. 60 while waiters: 61 waiter = waiters.popleft() 62 if not waiter.done(): 63 waiter.set_result(None) 64 break 65 66 def __repr__(self): 67 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' 68 69 def __str__(self): 70 return f'<{type(self).__name__} {self._format()}>' 71 72 __class_getitem__ = classmethod(GenericAlias) 73 74 def _format(self): 75 result = f'maxsize={self._maxsize!r}' 76 if getattr(self, '_queue', None): 77 result += f' _queue={list(self._queue)!r}' 78 if self._getters: 79 result += f' _getters[{len(self._getters)}]' 80 if self._putters: 81 result += f' _putters[{len(self._putters)}]' 82 if self._unfinished_tasks: 83 result += f' tasks={self._unfinished_tasks}' 84 return result 85 86 def qsize(self): 87 """Number of items in the queue.""" 88 return len(self._queue) 89 90 @property 91 def maxsize(self): 92 """Number of items allowed in the queue.""" 93 return self._maxsize 94 95 def empty(self): 96 """Return True if the queue is empty, False otherwise.""" 97 return not self._queue 98 99 def full(self): 100 """Return True if there are maxsize items in the queue. 101 102 Note: if the Queue was initialized with maxsize=0 (the default), 103 then full() is never True. 104 """ 105 if self._maxsize <= 0: 106 return False 107 else: 108 return self.qsize() >= self._maxsize 109 110 async def put(self, item): 111 """Put an item into the queue. 112 113 Put an item into the queue. If the queue is full, wait until a free 114 slot is available before adding item. 115 """ 116 while self.full(): 117 putter = self._get_loop().create_future() 118 self._putters.append(putter) 119 try: 120 await putter 121 except: 122 putter.cancel() # Just in case putter is not done yet. 123 try: 124 # Clean self._putters from canceled putters. 125 self._putters.remove(putter) 126 except ValueError: 127 # The putter could be removed from self._putters by a 128 # previous get_nowait call. 129 pass 130 if not self.full() and not putter.cancelled(): 131 # We were woken up by get_nowait(), but can't take 132 # the call. Wake up the next in line. 133 self._wakeup_next(self._putters) 134 raise 135 return self.put_nowait(item) 136 137 def put_nowait(self, item): 138 """Put an item into the queue without blocking. 139 140 If no free slot is immediately available, raise QueueFull. 141 """ 142 if self.full(): 143 raise QueueFull 144 self._put(item) 145 self._unfinished_tasks += 1 146 self._finished.clear() 147 self._wakeup_next(self._getters) 148 149 async def get(self): 150 """Remove and return an item from the queue. 151 152 If queue is empty, wait until an item is available. 153 """ 154 while self.empty(): 155 getter = self._get_loop().create_future() 156 self._getters.append(getter) 157 try: 158 await getter 159 except: 160 getter.cancel() # Just in case getter is not done yet. 161 try: 162 # Clean self._getters from canceled getters. 163 self._getters.remove(getter) 164 except ValueError: 165 # The getter could be removed from self._getters by a 166 # previous put_nowait call. 167 pass 168 if not self.empty() and not getter.cancelled(): 169 # We were woken up by put_nowait(), but can't take 170 # the call. Wake up the next in line. 171 self._wakeup_next(self._getters) 172 raise 173 return self.get_nowait() 174 175 def get_nowait(self): 176 """Remove and return an item from the queue. 177 178 Return an item if one is immediately available, else raise QueueEmpty. 179 """ 180 if self.empty(): 181 raise QueueEmpty 182 item = self._get() 183 self._wakeup_next(self._putters) 184 return item 185 186 def task_done(self): 187 """Indicate that a formerly enqueued task is complete. 188 189 Used by queue consumers. For each get() used to fetch a task, 190 a subsequent call to task_done() tells the queue that the processing 191 on the task is complete. 192 193 If a join() is currently blocking, it will resume when all items have 194 been processed (meaning that a task_done() call was received for every 195 item that had been put() into the queue). 196 197 Raises ValueError if called more times than there were items placed in 198 the queue. 199 """ 200 if self._unfinished_tasks <= 0: 201 raise ValueError('task_done() called too many times') 202 self._unfinished_tasks -= 1 203 if self._unfinished_tasks == 0: 204 self._finished.set() 205 206 async def join(self): 207 """Block until all items in the queue have been gotten and processed. 208 209 The count of unfinished tasks goes up whenever an item is added to the 210 queue. The count goes down whenever a consumer calls task_done() to 211 indicate that the item was retrieved and all work on it is complete. 212 When the count of unfinished tasks drops to zero, join() unblocks. 213 """ 214 if self._unfinished_tasks > 0: 215 await self._finished.wait() 216 217 218class PriorityQueue(Queue): 219 """A subclass of Queue; retrieves entries in priority order (lowest first). 220 221 Entries are typically tuples of the form: (priority number, data). 222 """ 223 224 def _init(self, maxsize): 225 self._queue = [] 226 227 def _put(self, item, heappush=heapq.heappush): 228 heappush(self._queue, item) 229 230 def _get(self, heappop=heapq.heappop): 231 return heappop(self._queue) 232 233 234class LifoQueue(Queue): 235 """A subclass of Queue that retrieves most recently added entries first.""" 236 237 def _init(self, maxsize): 238 self._queue = [] 239 240 def _put(self, item): 241 self._queue.append(item) 242 243 def _get(self): 244 return self._queue.pop() 245