xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/asyncio/queues.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1*cda5da8dSAndroid Build Coastguard Worker__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
2*cda5da8dSAndroid Build Coastguard Worker
3*cda5da8dSAndroid Build Coastguard Workerimport collections
4*cda5da8dSAndroid Build Coastguard Workerimport heapq
5*cda5da8dSAndroid Build Coastguard Workerfrom types import GenericAlias
6*cda5da8dSAndroid Build Coastguard Worker
7*cda5da8dSAndroid Build Coastguard Workerfrom . import locks
8*cda5da8dSAndroid Build Coastguard Workerfrom . import mixins
9*cda5da8dSAndroid Build Coastguard Worker
10*cda5da8dSAndroid Build Coastguard Worker
11*cda5da8dSAndroid Build Coastguard Workerclass QueueEmpty(Exception):
12*cda5da8dSAndroid Build Coastguard Worker    """Raised when Queue.get_nowait() is called on an empty Queue."""
13*cda5da8dSAndroid Build Coastguard Worker    pass
14*cda5da8dSAndroid Build Coastguard Worker
15*cda5da8dSAndroid Build Coastguard Worker
16*cda5da8dSAndroid Build Coastguard Workerclass QueueFull(Exception):
17*cda5da8dSAndroid Build Coastguard Worker    """Raised when the Queue.put_nowait() method is called on a full Queue."""
18*cda5da8dSAndroid Build Coastguard Worker    pass
19*cda5da8dSAndroid Build Coastguard Worker
20*cda5da8dSAndroid Build Coastguard Worker
21*cda5da8dSAndroid Build Coastguard Workerclass Queue(mixins._LoopBoundMixin):
22*cda5da8dSAndroid Build Coastguard Worker    """A queue, useful for coordinating producer and consumer coroutines.
23*cda5da8dSAndroid Build Coastguard Worker
24*cda5da8dSAndroid Build Coastguard Worker    If maxsize is less than or equal to zero, the queue size is infinite. If it
25*cda5da8dSAndroid Build Coastguard Worker    is an integer greater than 0, then "await put()" will block when the
26*cda5da8dSAndroid Build Coastguard Worker    queue reaches maxsize, until an item is removed by get().
27*cda5da8dSAndroid Build Coastguard Worker
28*cda5da8dSAndroid Build Coastguard Worker    Unlike the standard library Queue, you can reliably know this Queue's size
29*cda5da8dSAndroid Build Coastguard Worker    with qsize(), since your single-threaded asyncio application won't be
30*cda5da8dSAndroid Build Coastguard Worker    interrupted between calling qsize() and doing an operation on the Queue.
31*cda5da8dSAndroid Build Coastguard Worker    """
32*cda5da8dSAndroid Build Coastguard Worker
33*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, maxsize=0):
34*cda5da8dSAndroid Build Coastguard Worker        self._maxsize = maxsize
35*cda5da8dSAndroid Build Coastguard Worker
36*cda5da8dSAndroid Build Coastguard Worker        # Futures.
37*cda5da8dSAndroid Build Coastguard Worker        self._getters = collections.deque()
38*cda5da8dSAndroid Build Coastguard Worker        # Futures.
39*cda5da8dSAndroid Build Coastguard Worker        self._putters = collections.deque()
40*cda5da8dSAndroid Build Coastguard Worker        self._unfinished_tasks = 0
41*cda5da8dSAndroid Build Coastguard Worker        self._finished = locks.Event()
42*cda5da8dSAndroid Build Coastguard Worker        self._finished.set()
43*cda5da8dSAndroid Build Coastguard Worker        self._init(maxsize)
44*cda5da8dSAndroid Build Coastguard Worker
45*cda5da8dSAndroid Build Coastguard Worker    # These three are overridable in subclasses.
46*cda5da8dSAndroid Build Coastguard Worker
47*cda5da8dSAndroid Build Coastguard Worker    def _init(self, maxsize):
48*cda5da8dSAndroid Build Coastguard Worker        self._queue = collections.deque()
49*cda5da8dSAndroid Build Coastguard Worker
50*cda5da8dSAndroid Build Coastguard Worker    def _get(self):
51*cda5da8dSAndroid Build Coastguard Worker        return self._queue.popleft()
52*cda5da8dSAndroid Build Coastguard Worker
53*cda5da8dSAndroid Build Coastguard Worker    def _put(self, item):
54*cda5da8dSAndroid Build Coastguard Worker        self._queue.append(item)
55*cda5da8dSAndroid Build Coastguard Worker
56*cda5da8dSAndroid Build Coastguard Worker    # End of the overridable methods.
57*cda5da8dSAndroid Build Coastguard Worker
58*cda5da8dSAndroid Build Coastguard Worker    def _wakeup_next(self, waiters):
59*cda5da8dSAndroid Build Coastguard Worker        # Wake up the next waiter (if any) that isn't cancelled.
60*cda5da8dSAndroid Build Coastguard Worker        while waiters:
61*cda5da8dSAndroid Build Coastguard Worker            waiter = waiters.popleft()
62*cda5da8dSAndroid Build Coastguard Worker            if not waiter.done():
63*cda5da8dSAndroid Build Coastguard Worker                waiter.set_result(None)
64*cda5da8dSAndroid Build Coastguard Worker                break
65*cda5da8dSAndroid Build Coastguard Worker
66*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
67*cda5da8dSAndroid Build Coastguard Worker        return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
68*cda5da8dSAndroid Build Coastguard Worker
69*cda5da8dSAndroid Build Coastguard Worker    def __str__(self):
70*cda5da8dSAndroid Build Coastguard Worker        return f'<{type(self).__name__} {self._format()}>'
71*cda5da8dSAndroid Build Coastguard Worker
72*cda5da8dSAndroid Build Coastguard Worker    __class_getitem__ = classmethod(GenericAlias)
73*cda5da8dSAndroid Build Coastguard Worker
74*cda5da8dSAndroid Build Coastguard Worker    def _format(self):
75*cda5da8dSAndroid Build Coastguard Worker        result = f'maxsize={self._maxsize!r}'
76*cda5da8dSAndroid Build Coastguard Worker        if getattr(self, '_queue', None):
77*cda5da8dSAndroid Build Coastguard Worker            result += f' _queue={list(self._queue)!r}'
78*cda5da8dSAndroid Build Coastguard Worker        if self._getters:
79*cda5da8dSAndroid Build Coastguard Worker            result += f' _getters[{len(self._getters)}]'
80*cda5da8dSAndroid Build Coastguard Worker        if self._putters:
81*cda5da8dSAndroid Build Coastguard Worker            result += f' _putters[{len(self._putters)}]'
82*cda5da8dSAndroid Build Coastguard Worker        if self._unfinished_tasks:
83*cda5da8dSAndroid Build Coastguard Worker            result += f' tasks={self._unfinished_tasks}'
84*cda5da8dSAndroid Build Coastguard Worker        return result
85*cda5da8dSAndroid Build Coastguard Worker
86*cda5da8dSAndroid Build Coastguard Worker    def qsize(self):
87*cda5da8dSAndroid Build Coastguard Worker        """Number of items in the queue."""
88*cda5da8dSAndroid Build Coastguard Worker        return len(self._queue)
89*cda5da8dSAndroid Build Coastguard Worker
90*cda5da8dSAndroid Build Coastguard Worker    @property
91*cda5da8dSAndroid Build Coastguard Worker    def maxsize(self):
92*cda5da8dSAndroid Build Coastguard Worker        """Number of items allowed in the queue."""
93*cda5da8dSAndroid Build Coastguard Worker        return self._maxsize
94*cda5da8dSAndroid Build Coastguard Worker
95*cda5da8dSAndroid Build Coastguard Worker    def empty(self):
96*cda5da8dSAndroid Build Coastguard Worker        """Return True if the queue is empty, False otherwise."""
97*cda5da8dSAndroid Build Coastguard Worker        return not self._queue
98*cda5da8dSAndroid Build Coastguard Worker
99*cda5da8dSAndroid Build Coastguard Worker    def full(self):
100*cda5da8dSAndroid Build Coastguard Worker        """Return True if there are maxsize items in the queue.
101*cda5da8dSAndroid Build Coastguard Worker
102*cda5da8dSAndroid Build Coastguard Worker        Note: if the Queue was initialized with maxsize=0 (the default),
103*cda5da8dSAndroid Build Coastguard Worker        then full() is never True.
104*cda5da8dSAndroid Build Coastguard Worker        """
105*cda5da8dSAndroid Build Coastguard Worker        if self._maxsize <= 0:
106*cda5da8dSAndroid Build Coastguard Worker            return False
107*cda5da8dSAndroid Build Coastguard Worker        else:
108*cda5da8dSAndroid Build Coastguard Worker            return self.qsize() >= self._maxsize
109*cda5da8dSAndroid Build Coastguard Worker
110*cda5da8dSAndroid Build Coastguard Worker    async def put(self, item):
111*cda5da8dSAndroid Build Coastguard Worker        """Put an item into the queue.
112*cda5da8dSAndroid Build Coastguard Worker
113*cda5da8dSAndroid Build Coastguard Worker        Put an item into the queue. If the queue is full, wait until a free
114*cda5da8dSAndroid Build Coastguard Worker        slot is available before adding item.
115*cda5da8dSAndroid Build Coastguard Worker        """
116*cda5da8dSAndroid Build Coastguard Worker        while self.full():
117*cda5da8dSAndroid Build Coastguard Worker            putter = self._get_loop().create_future()
118*cda5da8dSAndroid Build Coastguard Worker            self._putters.append(putter)
119*cda5da8dSAndroid Build Coastguard Worker            try:
120*cda5da8dSAndroid Build Coastguard Worker                await putter
121*cda5da8dSAndroid Build Coastguard Worker            except:
122*cda5da8dSAndroid Build Coastguard Worker                putter.cancel()  # Just in case putter is not done yet.
123*cda5da8dSAndroid Build Coastguard Worker                try:
124*cda5da8dSAndroid Build Coastguard Worker                    # Clean self._putters from canceled putters.
125*cda5da8dSAndroid Build Coastguard Worker                    self._putters.remove(putter)
126*cda5da8dSAndroid Build Coastguard Worker                except ValueError:
127*cda5da8dSAndroid Build Coastguard Worker                    # The putter could be removed from self._putters by a
128*cda5da8dSAndroid Build Coastguard Worker                    # previous get_nowait call.
129*cda5da8dSAndroid Build Coastguard Worker                    pass
130*cda5da8dSAndroid Build Coastguard Worker                if not self.full() and not putter.cancelled():
131*cda5da8dSAndroid Build Coastguard Worker                    # We were woken up by get_nowait(), but can't take
132*cda5da8dSAndroid Build Coastguard Worker                    # the call.  Wake up the next in line.
133*cda5da8dSAndroid Build Coastguard Worker                    self._wakeup_next(self._putters)
134*cda5da8dSAndroid Build Coastguard Worker                raise
135*cda5da8dSAndroid Build Coastguard Worker        return self.put_nowait(item)
136*cda5da8dSAndroid Build Coastguard Worker
137*cda5da8dSAndroid Build Coastguard Worker    def put_nowait(self, item):
138*cda5da8dSAndroid Build Coastguard Worker        """Put an item into the queue without blocking.
139*cda5da8dSAndroid Build Coastguard Worker
140*cda5da8dSAndroid Build Coastguard Worker        If no free slot is immediately available, raise QueueFull.
141*cda5da8dSAndroid Build Coastguard Worker        """
142*cda5da8dSAndroid Build Coastguard Worker        if self.full():
143*cda5da8dSAndroid Build Coastguard Worker            raise QueueFull
144*cda5da8dSAndroid Build Coastguard Worker        self._put(item)
145*cda5da8dSAndroid Build Coastguard Worker        self._unfinished_tasks += 1
146*cda5da8dSAndroid Build Coastguard Worker        self._finished.clear()
147*cda5da8dSAndroid Build Coastguard Worker        self._wakeup_next(self._getters)
148*cda5da8dSAndroid Build Coastguard Worker
149*cda5da8dSAndroid Build Coastguard Worker    async def get(self):
150*cda5da8dSAndroid Build Coastguard Worker        """Remove and return an item from the queue.
151*cda5da8dSAndroid Build Coastguard Worker
152*cda5da8dSAndroid Build Coastguard Worker        If queue is empty, wait until an item is available.
153*cda5da8dSAndroid Build Coastguard Worker        """
154*cda5da8dSAndroid Build Coastguard Worker        while self.empty():
155*cda5da8dSAndroid Build Coastguard Worker            getter = self._get_loop().create_future()
156*cda5da8dSAndroid Build Coastguard Worker            self._getters.append(getter)
157*cda5da8dSAndroid Build Coastguard Worker            try:
158*cda5da8dSAndroid Build Coastguard Worker                await getter
159*cda5da8dSAndroid Build Coastguard Worker            except:
160*cda5da8dSAndroid Build Coastguard Worker                getter.cancel()  # Just in case getter is not done yet.
161*cda5da8dSAndroid Build Coastguard Worker                try:
162*cda5da8dSAndroid Build Coastguard Worker                    # Clean self._getters from canceled getters.
163*cda5da8dSAndroid Build Coastguard Worker                    self._getters.remove(getter)
164*cda5da8dSAndroid Build Coastguard Worker                except ValueError:
165*cda5da8dSAndroid Build Coastguard Worker                    # The getter could be removed from self._getters by a
166*cda5da8dSAndroid Build Coastguard Worker                    # previous put_nowait call.
167*cda5da8dSAndroid Build Coastguard Worker                    pass
168*cda5da8dSAndroid Build Coastguard Worker                if not self.empty() and not getter.cancelled():
169*cda5da8dSAndroid Build Coastguard Worker                    # We were woken up by put_nowait(), but can't take
170*cda5da8dSAndroid Build Coastguard Worker                    # the call.  Wake up the next in line.
171*cda5da8dSAndroid Build Coastguard Worker                    self._wakeup_next(self._getters)
172*cda5da8dSAndroid Build Coastguard Worker                raise
173*cda5da8dSAndroid Build Coastguard Worker        return self.get_nowait()
174*cda5da8dSAndroid Build Coastguard Worker
175*cda5da8dSAndroid Build Coastguard Worker    def get_nowait(self):
176*cda5da8dSAndroid Build Coastguard Worker        """Remove and return an item from the queue.
177*cda5da8dSAndroid Build Coastguard Worker
178*cda5da8dSAndroid Build Coastguard Worker        Return an item if one is immediately available, else raise QueueEmpty.
179*cda5da8dSAndroid Build Coastguard Worker        """
180*cda5da8dSAndroid Build Coastguard Worker        if self.empty():
181*cda5da8dSAndroid Build Coastguard Worker            raise QueueEmpty
182*cda5da8dSAndroid Build Coastguard Worker        item = self._get()
183*cda5da8dSAndroid Build Coastguard Worker        self._wakeup_next(self._putters)
184*cda5da8dSAndroid Build Coastguard Worker        return item
185*cda5da8dSAndroid Build Coastguard Worker
186*cda5da8dSAndroid Build Coastguard Worker    def task_done(self):
187*cda5da8dSAndroid Build Coastguard Worker        """Indicate that a formerly enqueued task is complete.
188*cda5da8dSAndroid Build Coastguard Worker
189*cda5da8dSAndroid Build Coastguard Worker        Used by queue consumers. For each get() used to fetch a task,
190*cda5da8dSAndroid Build Coastguard Worker        a subsequent call to task_done() tells the queue that the processing
191*cda5da8dSAndroid Build Coastguard Worker        on the task is complete.
192*cda5da8dSAndroid Build Coastguard Worker
193*cda5da8dSAndroid Build Coastguard Worker        If a join() is currently blocking, it will resume when all items have
194*cda5da8dSAndroid Build Coastguard Worker        been processed (meaning that a task_done() call was received for every
195*cda5da8dSAndroid Build Coastguard Worker        item that had been put() into the queue).
196*cda5da8dSAndroid Build Coastguard Worker
197*cda5da8dSAndroid Build Coastguard Worker        Raises ValueError if called more times than there were items placed in
198*cda5da8dSAndroid Build Coastguard Worker        the queue.
199*cda5da8dSAndroid Build Coastguard Worker        """
200*cda5da8dSAndroid Build Coastguard Worker        if self._unfinished_tasks <= 0:
201*cda5da8dSAndroid Build Coastguard Worker            raise ValueError('task_done() called too many times')
202*cda5da8dSAndroid Build Coastguard Worker        self._unfinished_tasks -= 1
203*cda5da8dSAndroid Build Coastguard Worker        if self._unfinished_tasks == 0:
204*cda5da8dSAndroid Build Coastguard Worker            self._finished.set()
205*cda5da8dSAndroid Build Coastguard Worker
206*cda5da8dSAndroid Build Coastguard Worker    async def join(self):
207*cda5da8dSAndroid Build Coastguard Worker        """Block until all items in the queue have been gotten and processed.
208*cda5da8dSAndroid Build Coastguard Worker
209*cda5da8dSAndroid Build Coastguard Worker        The count of unfinished tasks goes up whenever an item is added to the
210*cda5da8dSAndroid Build Coastguard Worker        queue. The count goes down whenever a consumer calls task_done() to
211*cda5da8dSAndroid Build Coastguard Worker        indicate that the item was retrieved and all work on it is complete.
212*cda5da8dSAndroid Build Coastguard Worker        When the count of unfinished tasks drops to zero, join() unblocks.
213*cda5da8dSAndroid Build Coastguard Worker        """
214*cda5da8dSAndroid Build Coastguard Worker        if self._unfinished_tasks > 0:
215*cda5da8dSAndroid Build Coastguard Worker            await self._finished.wait()
216*cda5da8dSAndroid Build Coastguard Worker
217*cda5da8dSAndroid Build Coastguard Worker
218*cda5da8dSAndroid Build Coastguard Workerclass PriorityQueue(Queue):
219*cda5da8dSAndroid Build Coastguard Worker    """A subclass of Queue; retrieves entries in priority order (lowest first).
220*cda5da8dSAndroid Build Coastguard Worker
221*cda5da8dSAndroid Build Coastguard Worker    Entries are typically tuples of the form: (priority number, data).
222*cda5da8dSAndroid Build Coastguard Worker    """
223*cda5da8dSAndroid Build Coastguard Worker
224*cda5da8dSAndroid Build Coastguard Worker    def _init(self, maxsize):
225*cda5da8dSAndroid Build Coastguard Worker        self._queue = []
226*cda5da8dSAndroid Build Coastguard Worker
227*cda5da8dSAndroid Build Coastguard Worker    def _put(self, item, heappush=heapq.heappush):
228*cda5da8dSAndroid Build Coastguard Worker        heappush(self._queue, item)
229*cda5da8dSAndroid Build Coastguard Worker
230*cda5da8dSAndroid Build Coastguard Worker    def _get(self, heappop=heapq.heappop):
231*cda5da8dSAndroid Build Coastguard Worker        return heappop(self._queue)
232*cda5da8dSAndroid Build Coastguard Worker
233*cda5da8dSAndroid Build Coastguard Worker
234*cda5da8dSAndroid Build Coastguard Workerclass LifoQueue(Queue):
235*cda5da8dSAndroid Build Coastguard Worker    """A subclass of Queue that retrieves most recently added entries first."""
236*cda5da8dSAndroid Build Coastguard Worker
237*cda5da8dSAndroid Build Coastguard Worker    def _init(self, maxsize):
238*cda5da8dSAndroid Build Coastguard Worker        self._queue = []
239*cda5da8dSAndroid Build Coastguard Worker
240*cda5da8dSAndroid Build Coastguard Worker    def _put(self, item):
241*cda5da8dSAndroid Build Coastguard Worker        self._queue.append(item)
242*cda5da8dSAndroid Build Coastguard Worker
243*cda5da8dSAndroid Build Coastguard Worker    def _get(self):
244*cda5da8dSAndroid Build Coastguard Worker        return self._queue.pop()
245