xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/asyncio/queues.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
2
3import collections
4import heapq
5from types import GenericAlias
6
7from . import locks
8from . import mixins
9
10
11class QueueEmpty(Exception):
12    """Raised when Queue.get_nowait() is called on an empty Queue."""
13    pass
14
15
16class QueueFull(Exception):
17    """Raised when the Queue.put_nowait() method is called on a full Queue."""
18    pass
19
20
21class Queue(mixins._LoopBoundMixin):
22    """A queue, useful for coordinating producer and consumer coroutines.
23
24    If maxsize is less than or equal to zero, the queue size is infinite. If it
25    is an integer greater than 0, then "await put()" will block when the
26    queue reaches maxsize, until an item is removed by get().
27
28    Unlike the standard library Queue, you can reliably know this Queue's size
29    with qsize(), since your single-threaded asyncio application won't be
30    interrupted between calling qsize() and doing an operation on the Queue.
31    """
32
33    def __init__(self, maxsize=0):
34        self._maxsize = maxsize
35
36        # Futures.
37        self._getters = collections.deque()
38        # Futures.
39        self._putters = collections.deque()
40        self._unfinished_tasks = 0
41        self._finished = locks.Event()
42        self._finished.set()
43        self._init(maxsize)
44
45    # These three are overridable in subclasses.
46
47    def _init(self, maxsize):
48        self._queue = collections.deque()
49
50    def _get(self):
51        return self._queue.popleft()
52
53    def _put(self, item):
54        self._queue.append(item)
55
56    # End of the overridable methods.
57
58    def _wakeup_next(self, waiters):
59        # Wake up the next waiter (if any) that isn't cancelled.
60        while waiters:
61            waiter = waiters.popleft()
62            if not waiter.done():
63                waiter.set_result(None)
64                break
65
66    def __repr__(self):
67        return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
68
69    def __str__(self):
70        return f'<{type(self).__name__} {self._format()}>'
71
72    __class_getitem__ = classmethod(GenericAlias)
73
74    def _format(self):
75        result = f'maxsize={self._maxsize!r}'
76        if getattr(self, '_queue', None):
77            result += f' _queue={list(self._queue)!r}'
78        if self._getters:
79            result += f' _getters[{len(self._getters)}]'
80        if self._putters:
81            result += f' _putters[{len(self._putters)}]'
82        if self._unfinished_tasks:
83            result += f' tasks={self._unfinished_tasks}'
84        return result
85
86    def qsize(self):
87        """Number of items in the queue."""
88        return len(self._queue)
89
90    @property
91    def maxsize(self):
92        """Number of items allowed in the queue."""
93        return self._maxsize
94
95    def empty(self):
96        """Return True if the queue is empty, False otherwise."""
97        return not self._queue
98
99    def full(self):
100        """Return True if there are maxsize items in the queue.
101
102        Note: if the Queue was initialized with maxsize=0 (the default),
103        then full() is never True.
104        """
105        if self._maxsize <= 0:
106            return False
107        else:
108            return self.qsize() >= self._maxsize
109
110    async def put(self, item):
111        """Put an item into the queue.
112
113        Put an item into the queue. If the queue is full, wait until a free
114        slot is available before adding item.
115        """
116        while self.full():
117            putter = self._get_loop().create_future()
118            self._putters.append(putter)
119            try:
120                await putter
121            except:
122                putter.cancel()  # Just in case putter is not done yet.
123                try:
124                    # Clean self._putters from canceled putters.
125                    self._putters.remove(putter)
126                except ValueError:
127                    # The putter could be removed from self._putters by a
128                    # previous get_nowait call.
129                    pass
130                if not self.full() and not putter.cancelled():
131                    # We were woken up by get_nowait(), but can't take
132                    # the call.  Wake up the next in line.
133                    self._wakeup_next(self._putters)
134                raise
135        return self.put_nowait(item)
136
137    def put_nowait(self, item):
138        """Put an item into the queue without blocking.
139
140        If no free slot is immediately available, raise QueueFull.
141        """
142        if self.full():
143            raise QueueFull
144        self._put(item)
145        self._unfinished_tasks += 1
146        self._finished.clear()
147        self._wakeup_next(self._getters)
148
149    async def get(self):
150        """Remove and return an item from the queue.
151
152        If queue is empty, wait until an item is available.
153        """
154        while self.empty():
155            getter = self._get_loop().create_future()
156            self._getters.append(getter)
157            try:
158                await getter
159            except:
160                getter.cancel()  # Just in case getter is not done yet.
161                try:
162                    # Clean self._getters from canceled getters.
163                    self._getters.remove(getter)
164                except ValueError:
165                    # The getter could be removed from self._getters by a
166                    # previous put_nowait call.
167                    pass
168                if not self.empty() and not getter.cancelled():
169                    # We were woken up by put_nowait(), but can't take
170                    # the call.  Wake up the next in line.
171                    self._wakeup_next(self._getters)
172                raise
173        return self.get_nowait()
174
175    def get_nowait(self):
176        """Remove and return an item from the queue.
177
178        Return an item if one is immediately available, else raise QueueEmpty.
179        """
180        if self.empty():
181            raise QueueEmpty
182        item = self._get()
183        self._wakeup_next(self._putters)
184        return item
185
186    def task_done(self):
187        """Indicate that a formerly enqueued task is complete.
188
189        Used by queue consumers. For each get() used to fetch a task,
190        a subsequent call to task_done() tells the queue that the processing
191        on the task is complete.
192
193        If a join() is currently blocking, it will resume when all items have
194        been processed (meaning that a task_done() call was received for every
195        item that had been put() into the queue).
196
197        Raises ValueError if called more times than there were items placed in
198        the queue.
199        """
200        if self._unfinished_tasks <= 0:
201            raise ValueError('task_done() called too many times')
202        self._unfinished_tasks -= 1
203        if self._unfinished_tasks == 0:
204            self._finished.set()
205
206    async def join(self):
207        """Block until all items in the queue have been gotten and processed.
208
209        The count of unfinished tasks goes up whenever an item is added to the
210        queue. The count goes down whenever a consumer calls task_done() to
211        indicate that the item was retrieved and all work on it is complete.
212        When the count of unfinished tasks drops to zero, join() unblocks.
213        """
214        if self._unfinished_tasks > 0:
215            await self._finished.wait()
216
217
218class PriorityQueue(Queue):
219    """A subclass of Queue; retrieves entries in priority order (lowest first).
220
221    Entries are typically tuples of the form: (priority number, data).
222    """
223
224    def _init(self, maxsize):
225        self._queue = []
226
227    def _put(self, item, heappush=heapq.heappush):
228        heappush(self._queue, item)
229
230    def _get(self, heappop=heapq.heappop):
231        return heappop(self._queue)
232
233
234class LifoQueue(Queue):
235    """A subclass of Queue that retrieves most recently added entries first."""
236
237    def _init(self, maxsize):
238        self._queue = []
239
240    def _put(self, item):
241        self._queue.append(item)
242
243    def _get(self):
244        return self._queue.pop()
245