xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/multiprocessing/queues.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1*cda5da8dSAndroid Build Coastguard Worker#
2*cda5da8dSAndroid Build Coastguard Worker# Module implementing queues
3*cda5da8dSAndroid Build Coastguard Worker#
4*cda5da8dSAndroid Build Coastguard Worker# multiprocessing/queues.py
5*cda5da8dSAndroid Build Coastguard Worker#
6*cda5da8dSAndroid Build Coastguard Worker# Copyright (c) 2006-2008, R Oudkerk
7*cda5da8dSAndroid Build Coastguard Worker# Licensed to PSF under a Contributor Agreement.
8*cda5da8dSAndroid Build Coastguard Worker#
9*cda5da8dSAndroid Build Coastguard Worker
10*cda5da8dSAndroid Build Coastguard Worker__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
11*cda5da8dSAndroid Build Coastguard Worker
12*cda5da8dSAndroid Build Coastguard Workerimport sys
13*cda5da8dSAndroid Build Coastguard Workerimport os
14*cda5da8dSAndroid Build Coastguard Workerimport threading
15*cda5da8dSAndroid Build Coastguard Workerimport collections
16*cda5da8dSAndroid Build Coastguard Workerimport time
17*cda5da8dSAndroid Build Coastguard Workerimport types
18*cda5da8dSAndroid Build Coastguard Workerimport weakref
19*cda5da8dSAndroid Build Coastguard Workerimport errno
20*cda5da8dSAndroid Build Coastguard Worker
21*cda5da8dSAndroid Build Coastguard Workerfrom queue import Empty, Full
22*cda5da8dSAndroid Build Coastguard Worker
23*cda5da8dSAndroid Build Coastguard Workerimport _multiprocessing
24*cda5da8dSAndroid Build Coastguard Worker
25*cda5da8dSAndroid Build Coastguard Workerfrom . import connection
26*cda5da8dSAndroid Build Coastguard Workerfrom . import context
27*cda5da8dSAndroid Build Coastguard Worker_ForkingPickler = context.reduction.ForkingPickler
28*cda5da8dSAndroid Build Coastguard Worker
29*cda5da8dSAndroid Build Coastguard Workerfrom .util import debug, info, Finalize, register_after_fork, is_exiting
30*cda5da8dSAndroid Build Coastguard Worker
31*cda5da8dSAndroid Build Coastguard Worker#
32*cda5da8dSAndroid Build Coastguard Worker# Queue type using a pipe, buffer and thread
33*cda5da8dSAndroid Build Coastguard Worker#
34*cda5da8dSAndroid Build Coastguard Worker
35*cda5da8dSAndroid Build Coastguard Workerclass Queue(object):
36*cda5da8dSAndroid Build Coastguard Worker
37*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, maxsize=0, *, ctx):
38*cda5da8dSAndroid Build Coastguard Worker        if maxsize <= 0:
39*cda5da8dSAndroid Build Coastguard Worker            # Can raise ImportError (see issues #3770 and #23400)
40*cda5da8dSAndroid Build Coastguard Worker            from .synchronize import SEM_VALUE_MAX as maxsize
41*cda5da8dSAndroid Build Coastguard Worker        self._maxsize = maxsize
42*cda5da8dSAndroid Build Coastguard Worker        self._reader, self._writer = connection.Pipe(duplex=False)
43*cda5da8dSAndroid Build Coastguard Worker        self._rlock = ctx.Lock()
44*cda5da8dSAndroid Build Coastguard Worker        self._opid = os.getpid()
45*cda5da8dSAndroid Build Coastguard Worker        if sys.platform == 'win32':
46*cda5da8dSAndroid Build Coastguard Worker            self._wlock = None
47*cda5da8dSAndroid Build Coastguard Worker        else:
48*cda5da8dSAndroid Build Coastguard Worker            self._wlock = ctx.Lock()
49*cda5da8dSAndroid Build Coastguard Worker        self._sem = ctx.BoundedSemaphore(maxsize)
50*cda5da8dSAndroid Build Coastguard Worker        # For use by concurrent.futures
51*cda5da8dSAndroid Build Coastguard Worker        self._ignore_epipe = False
52*cda5da8dSAndroid Build Coastguard Worker        self._reset()
53*cda5da8dSAndroid Build Coastguard Worker
54*cda5da8dSAndroid Build Coastguard Worker        if sys.platform != 'win32':
55*cda5da8dSAndroid Build Coastguard Worker            register_after_fork(self, Queue._after_fork)
56*cda5da8dSAndroid Build Coastguard Worker
57*cda5da8dSAndroid Build Coastguard Worker    def __getstate__(self):
58*cda5da8dSAndroid Build Coastguard Worker        context.assert_spawning(self)
59*cda5da8dSAndroid Build Coastguard Worker        return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
60*cda5da8dSAndroid Build Coastguard Worker                self._rlock, self._wlock, self._sem, self._opid)
61*cda5da8dSAndroid Build Coastguard Worker
62*cda5da8dSAndroid Build Coastguard Worker    def __setstate__(self, state):
63*cda5da8dSAndroid Build Coastguard Worker        (self._ignore_epipe, self._maxsize, self._reader, self._writer,
64*cda5da8dSAndroid Build Coastguard Worker         self._rlock, self._wlock, self._sem, self._opid) = state
65*cda5da8dSAndroid Build Coastguard Worker        self._reset()
66*cda5da8dSAndroid Build Coastguard Worker
67*cda5da8dSAndroid Build Coastguard Worker    def _after_fork(self):
68*cda5da8dSAndroid Build Coastguard Worker        debug('Queue._after_fork()')
69*cda5da8dSAndroid Build Coastguard Worker        self._reset(after_fork=True)
70*cda5da8dSAndroid Build Coastguard Worker
71*cda5da8dSAndroid Build Coastguard Worker    def _reset(self, after_fork=False):
72*cda5da8dSAndroid Build Coastguard Worker        if after_fork:
73*cda5da8dSAndroid Build Coastguard Worker            self._notempty._at_fork_reinit()
74*cda5da8dSAndroid Build Coastguard Worker        else:
75*cda5da8dSAndroid Build Coastguard Worker            self._notempty = threading.Condition(threading.Lock())
76*cda5da8dSAndroid Build Coastguard Worker        self._buffer = collections.deque()
77*cda5da8dSAndroid Build Coastguard Worker        self._thread = None
78*cda5da8dSAndroid Build Coastguard Worker        self._jointhread = None
79*cda5da8dSAndroid Build Coastguard Worker        self._joincancelled = False
80*cda5da8dSAndroid Build Coastguard Worker        self._closed = False
81*cda5da8dSAndroid Build Coastguard Worker        self._close = None
82*cda5da8dSAndroid Build Coastguard Worker        self._send_bytes = self._writer.send_bytes
83*cda5da8dSAndroid Build Coastguard Worker        self._recv_bytes = self._reader.recv_bytes
84*cda5da8dSAndroid Build Coastguard Worker        self._poll = self._reader.poll
85*cda5da8dSAndroid Build Coastguard Worker
86*cda5da8dSAndroid Build Coastguard Worker    def put(self, obj, block=True, timeout=None):
87*cda5da8dSAndroid Build Coastguard Worker        if self._closed:
88*cda5da8dSAndroid Build Coastguard Worker            raise ValueError(f"Queue {self!r} is closed")
89*cda5da8dSAndroid Build Coastguard Worker        if not self._sem.acquire(block, timeout):
90*cda5da8dSAndroid Build Coastguard Worker            raise Full
91*cda5da8dSAndroid Build Coastguard Worker
92*cda5da8dSAndroid Build Coastguard Worker        with self._notempty:
93*cda5da8dSAndroid Build Coastguard Worker            if self._thread is None:
94*cda5da8dSAndroid Build Coastguard Worker                self._start_thread()
95*cda5da8dSAndroid Build Coastguard Worker            self._buffer.append(obj)
96*cda5da8dSAndroid Build Coastguard Worker            self._notempty.notify()
97*cda5da8dSAndroid Build Coastguard Worker
98*cda5da8dSAndroid Build Coastguard Worker    def get(self, block=True, timeout=None):
99*cda5da8dSAndroid Build Coastguard Worker        if self._closed:
100*cda5da8dSAndroid Build Coastguard Worker            raise ValueError(f"Queue {self!r} is closed")
101*cda5da8dSAndroid Build Coastguard Worker        if block and timeout is None:
102*cda5da8dSAndroid Build Coastguard Worker            with self._rlock:
103*cda5da8dSAndroid Build Coastguard Worker                res = self._recv_bytes()
104*cda5da8dSAndroid Build Coastguard Worker            self._sem.release()
105*cda5da8dSAndroid Build Coastguard Worker        else:
106*cda5da8dSAndroid Build Coastguard Worker            if block:
107*cda5da8dSAndroid Build Coastguard Worker                deadline = time.monotonic() + timeout
108*cda5da8dSAndroid Build Coastguard Worker            if not self._rlock.acquire(block, timeout):
109*cda5da8dSAndroid Build Coastguard Worker                raise Empty
110*cda5da8dSAndroid Build Coastguard Worker            try:
111*cda5da8dSAndroid Build Coastguard Worker                if block:
112*cda5da8dSAndroid Build Coastguard Worker                    timeout = deadline - time.monotonic()
113*cda5da8dSAndroid Build Coastguard Worker                    if not self._poll(timeout):
114*cda5da8dSAndroid Build Coastguard Worker                        raise Empty
115*cda5da8dSAndroid Build Coastguard Worker                elif not self._poll():
116*cda5da8dSAndroid Build Coastguard Worker                    raise Empty
117*cda5da8dSAndroid Build Coastguard Worker                res = self._recv_bytes()
118*cda5da8dSAndroid Build Coastguard Worker                self._sem.release()
119*cda5da8dSAndroid Build Coastguard Worker            finally:
120*cda5da8dSAndroid Build Coastguard Worker                self._rlock.release()
121*cda5da8dSAndroid Build Coastguard Worker        # unserialize the data after having released the lock
122*cda5da8dSAndroid Build Coastguard Worker        return _ForkingPickler.loads(res)
123*cda5da8dSAndroid Build Coastguard Worker
124*cda5da8dSAndroid Build Coastguard Worker    def qsize(self):
125*cda5da8dSAndroid Build Coastguard Worker        # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
126*cda5da8dSAndroid Build Coastguard Worker        return self._maxsize - self._sem._semlock._get_value()
127*cda5da8dSAndroid Build Coastguard Worker
128*cda5da8dSAndroid Build Coastguard Worker    def empty(self):
129*cda5da8dSAndroid Build Coastguard Worker        return not self._poll()
130*cda5da8dSAndroid Build Coastguard Worker
131*cda5da8dSAndroid Build Coastguard Worker    def full(self):
132*cda5da8dSAndroid Build Coastguard Worker        return self._sem._semlock._is_zero()
133*cda5da8dSAndroid Build Coastguard Worker
134*cda5da8dSAndroid Build Coastguard Worker    def get_nowait(self):
135*cda5da8dSAndroid Build Coastguard Worker        return self.get(False)
136*cda5da8dSAndroid Build Coastguard Worker
137*cda5da8dSAndroid Build Coastguard Worker    def put_nowait(self, obj):
138*cda5da8dSAndroid Build Coastguard Worker        return self.put(obj, False)
139*cda5da8dSAndroid Build Coastguard Worker
140*cda5da8dSAndroid Build Coastguard Worker    def close(self):
141*cda5da8dSAndroid Build Coastguard Worker        self._closed = True
142*cda5da8dSAndroid Build Coastguard Worker        close = self._close
143*cda5da8dSAndroid Build Coastguard Worker        if close:
144*cda5da8dSAndroid Build Coastguard Worker            self._close = None
145*cda5da8dSAndroid Build Coastguard Worker            close()
146*cda5da8dSAndroid Build Coastguard Worker
147*cda5da8dSAndroid Build Coastguard Worker    def join_thread(self):
148*cda5da8dSAndroid Build Coastguard Worker        debug('Queue.join_thread()')
149*cda5da8dSAndroid Build Coastguard Worker        assert self._closed, "Queue {0!r} not closed".format(self)
150*cda5da8dSAndroid Build Coastguard Worker        if self._jointhread:
151*cda5da8dSAndroid Build Coastguard Worker            self._jointhread()
152*cda5da8dSAndroid Build Coastguard Worker
153*cda5da8dSAndroid Build Coastguard Worker    def cancel_join_thread(self):
154*cda5da8dSAndroid Build Coastguard Worker        debug('Queue.cancel_join_thread()')
155*cda5da8dSAndroid Build Coastguard Worker        self._joincancelled = True
156*cda5da8dSAndroid Build Coastguard Worker        try:
157*cda5da8dSAndroid Build Coastguard Worker            self._jointhread.cancel()
158*cda5da8dSAndroid Build Coastguard Worker        except AttributeError:
159*cda5da8dSAndroid Build Coastguard Worker            pass
160*cda5da8dSAndroid Build Coastguard Worker
161*cda5da8dSAndroid Build Coastguard Worker    def _start_thread(self):
162*cda5da8dSAndroid Build Coastguard Worker        debug('Queue._start_thread()')
163*cda5da8dSAndroid Build Coastguard Worker
164*cda5da8dSAndroid Build Coastguard Worker        # Start thread which transfers data from buffer to pipe
165*cda5da8dSAndroid Build Coastguard Worker        self._buffer.clear()
166*cda5da8dSAndroid Build Coastguard Worker        self._thread = threading.Thread(
167*cda5da8dSAndroid Build Coastguard Worker            target=Queue._feed,
168*cda5da8dSAndroid Build Coastguard Worker            args=(self._buffer, self._notempty, self._send_bytes,
169*cda5da8dSAndroid Build Coastguard Worker                  self._wlock, self._reader.close, self._writer.close,
170*cda5da8dSAndroid Build Coastguard Worker                  self._ignore_epipe, self._on_queue_feeder_error,
171*cda5da8dSAndroid Build Coastguard Worker                  self._sem),
172*cda5da8dSAndroid Build Coastguard Worker            name='QueueFeederThread'
173*cda5da8dSAndroid Build Coastguard Worker        )
174*cda5da8dSAndroid Build Coastguard Worker        self._thread.daemon = True
175*cda5da8dSAndroid Build Coastguard Worker
176*cda5da8dSAndroid Build Coastguard Worker        debug('doing self._thread.start()')
177*cda5da8dSAndroid Build Coastguard Worker        self._thread.start()
178*cda5da8dSAndroid Build Coastguard Worker        debug('... done self._thread.start()')
179*cda5da8dSAndroid Build Coastguard Worker
180*cda5da8dSAndroid Build Coastguard Worker        if not self._joincancelled:
181*cda5da8dSAndroid Build Coastguard Worker            self._jointhread = Finalize(
182*cda5da8dSAndroid Build Coastguard Worker                self._thread, Queue._finalize_join,
183*cda5da8dSAndroid Build Coastguard Worker                [weakref.ref(self._thread)],
184*cda5da8dSAndroid Build Coastguard Worker                exitpriority=-5
185*cda5da8dSAndroid Build Coastguard Worker                )
186*cda5da8dSAndroid Build Coastguard Worker
187*cda5da8dSAndroid Build Coastguard Worker        # Send sentinel to the thread queue object when garbage collected
188*cda5da8dSAndroid Build Coastguard Worker        self._close = Finalize(
189*cda5da8dSAndroid Build Coastguard Worker            self, Queue._finalize_close,
190*cda5da8dSAndroid Build Coastguard Worker            [self._buffer, self._notempty],
191*cda5da8dSAndroid Build Coastguard Worker            exitpriority=10
192*cda5da8dSAndroid Build Coastguard Worker            )
193*cda5da8dSAndroid Build Coastguard Worker
194*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
195*cda5da8dSAndroid Build Coastguard Worker    def _finalize_join(twr):
196*cda5da8dSAndroid Build Coastguard Worker        debug('joining queue thread')
197*cda5da8dSAndroid Build Coastguard Worker        thread = twr()
198*cda5da8dSAndroid Build Coastguard Worker        if thread is not None:
199*cda5da8dSAndroid Build Coastguard Worker            thread.join()
200*cda5da8dSAndroid Build Coastguard Worker            debug('... queue thread joined')
201*cda5da8dSAndroid Build Coastguard Worker        else:
202*cda5da8dSAndroid Build Coastguard Worker            debug('... queue thread already dead')
203*cda5da8dSAndroid Build Coastguard Worker
204*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
205*cda5da8dSAndroid Build Coastguard Worker    def _finalize_close(buffer, notempty):
206*cda5da8dSAndroid Build Coastguard Worker        debug('telling queue thread to quit')
207*cda5da8dSAndroid Build Coastguard Worker        with notempty:
208*cda5da8dSAndroid Build Coastguard Worker            buffer.append(_sentinel)
209*cda5da8dSAndroid Build Coastguard Worker            notempty.notify()
210*cda5da8dSAndroid Build Coastguard Worker
211*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
212*cda5da8dSAndroid Build Coastguard Worker    def _feed(buffer, notempty, send_bytes, writelock, reader_close,
213*cda5da8dSAndroid Build Coastguard Worker              writer_close, ignore_epipe, onerror, queue_sem):
214*cda5da8dSAndroid Build Coastguard Worker        debug('starting thread to feed data to pipe')
215*cda5da8dSAndroid Build Coastguard Worker        nacquire = notempty.acquire
216*cda5da8dSAndroid Build Coastguard Worker        nrelease = notempty.release
217*cda5da8dSAndroid Build Coastguard Worker        nwait = notempty.wait
218*cda5da8dSAndroid Build Coastguard Worker        bpopleft = buffer.popleft
219*cda5da8dSAndroid Build Coastguard Worker        sentinel = _sentinel
220*cda5da8dSAndroid Build Coastguard Worker        if sys.platform != 'win32':
221*cda5da8dSAndroid Build Coastguard Worker            wacquire = writelock.acquire
222*cda5da8dSAndroid Build Coastguard Worker            wrelease = writelock.release
223*cda5da8dSAndroid Build Coastguard Worker        else:
224*cda5da8dSAndroid Build Coastguard Worker            wacquire = None
225*cda5da8dSAndroid Build Coastguard Worker
226*cda5da8dSAndroid Build Coastguard Worker        while 1:
227*cda5da8dSAndroid Build Coastguard Worker            try:
228*cda5da8dSAndroid Build Coastguard Worker                nacquire()
229*cda5da8dSAndroid Build Coastguard Worker                try:
230*cda5da8dSAndroid Build Coastguard Worker                    if not buffer:
231*cda5da8dSAndroid Build Coastguard Worker                        nwait()
232*cda5da8dSAndroid Build Coastguard Worker                finally:
233*cda5da8dSAndroid Build Coastguard Worker                    nrelease()
234*cda5da8dSAndroid Build Coastguard Worker                try:
235*cda5da8dSAndroid Build Coastguard Worker                    while 1:
236*cda5da8dSAndroid Build Coastguard Worker                        obj = bpopleft()
237*cda5da8dSAndroid Build Coastguard Worker                        if obj is sentinel:
238*cda5da8dSAndroid Build Coastguard Worker                            debug('feeder thread got sentinel -- exiting')
239*cda5da8dSAndroid Build Coastguard Worker                            reader_close()
240*cda5da8dSAndroid Build Coastguard Worker                            writer_close()
241*cda5da8dSAndroid Build Coastguard Worker                            return
242*cda5da8dSAndroid Build Coastguard Worker
243*cda5da8dSAndroid Build Coastguard Worker                        # serialize the data before acquiring the lock
244*cda5da8dSAndroid Build Coastguard Worker                        obj = _ForkingPickler.dumps(obj)
245*cda5da8dSAndroid Build Coastguard Worker                        if wacquire is None:
246*cda5da8dSAndroid Build Coastguard Worker                            send_bytes(obj)
247*cda5da8dSAndroid Build Coastguard Worker                        else:
248*cda5da8dSAndroid Build Coastguard Worker                            wacquire()
249*cda5da8dSAndroid Build Coastguard Worker                            try:
250*cda5da8dSAndroid Build Coastguard Worker                                send_bytes(obj)
251*cda5da8dSAndroid Build Coastguard Worker                            finally:
252*cda5da8dSAndroid Build Coastguard Worker                                wrelease()
253*cda5da8dSAndroid Build Coastguard Worker                except IndexError:
254*cda5da8dSAndroid Build Coastguard Worker                    pass
255*cda5da8dSAndroid Build Coastguard Worker            except Exception as e:
256*cda5da8dSAndroid Build Coastguard Worker                if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
257*cda5da8dSAndroid Build Coastguard Worker                    return
258*cda5da8dSAndroid Build Coastguard Worker                # Since this runs in a daemon thread the resources it uses
259*cda5da8dSAndroid Build Coastguard Worker                # may be become unusable while the process is cleaning up.
260*cda5da8dSAndroid Build Coastguard Worker                # We ignore errors which happen after the process has
261*cda5da8dSAndroid Build Coastguard Worker                # started to cleanup.
262*cda5da8dSAndroid Build Coastguard Worker                if is_exiting():
263*cda5da8dSAndroid Build Coastguard Worker                    info('error in queue thread: %s', e)
264*cda5da8dSAndroid Build Coastguard Worker                    return
265*cda5da8dSAndroid Build Coastguard Worker                else:
266*cda5da8dSAndroid Build Coastguard Worker                    # Since the object has not been sent in the queue, we need
267*cda5da8dSAndroid Build Coastguard Worker                    # to decrease the size of the queue. The error acts as
268*cda5da8dSAndroid Build Coastguard Worker                    # if the object had been silently removed from the queue
269*cda5da8dSAndroid Build Coastguard Worker                    # and this step is necessary to have a properly working
270*cda5da8dSAndroid Build Coastguard Worker                    # queue.
271*cda5da8dSAndroid Build Coastguard Worker                    queue_sem.release()
272*cda5da8dSAndroid Build Coastguard Worker                    onerror(e, obj)
273*cda5da8dSAndroid Build Coastguard Worker
274*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
275*cda5da8dSAndroid Build Coastguard Worker    def _on_queue_feeder_error(e, obj):
276*cda5da8dSAndroid Build Coastguard Worker        """
277*cda5da8dSAndroid Build Coastguard Worker        Private API hook called when feeding data in the background thread
278*cda5da8dSAndroid Build Coastguard Worker        raises an exception.  For overriding by concurrent.futures.
279*cda5da8dSAndroid Build Coastguard Worker        """
280*cda5da8dSAndroid Build Coastguard Worker        import traceback
281*cda5da8dSAndroid Build Coastguard Worker        traceback.print_exc()
282*cda5da8dSAndroid Build Coastguard Worker
283*cda5da8dSAndroid Build Coastguard Worker
284*cda5da8dSAndroid Build Coastguard Worker_sentinel = object()
285*cda5da8dSAndroid Build Coastguard Worker
286*cda5da8dSAndroid Build Coastguard Worker#
287*cda5da8dSAndroid Build Coastguard Worker# A queue type which also supports join() and task_done() methods
288*cda5da8dSAndroid Build Coastguard Worker#
289*cda5da8dSAndroid Build Coastguard Worker# Note that if you do not call task_done() for each finished task then
290*cda5da8dSAndroid Build Coastguard Worker# eventually the counter's semaphore may overflow causing Bad Things
291*cda5da8dSAndroid Build Coastguard Worker# to happen.
292*cda5da8dSAndroid Build Coastguard Worker#
293*cda5da8dSAndroid Build Coastguard Worker
294*cda5da8dSAndroid Build Coastguard Workerclass JoinableQueue(Queue):
295*cda5da8dSAndroid Build Coastguard Worker
296*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, maxsize=0, *, ctx):
297*cda5da8dSAndroid Build Coastguard Worker        Queue.__init__(self, maxsize, ctx=ctx)
298*cda5da8dSAndroid Build Coastguard Worker        self._unfinished_tasks = ctx.Semaphore(0)
299*cda5da8dSAndroid Build Coastguard Worker        self._cond = ctx.Condition()
300*cda5da8dSAndroid Build Coastguard Worker
301*cda5da8dSAndroid Build Coastguard Worker    def __getstate__(self):
302*cda5da8dSAndroid Build Coastguard Worker        return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
303*cda5da8dSAndroid Build Coastguard Worker
304*cda5da8dSAndroid Build Coastguard Worker    def __setstate__(self, state):
305*cda5da8dSAndroid Build Coastguard Worker        Queue.__setstate__(self, state[:-2])
306*cda5da8dSAndroid Build Coastguard Worker        self._cond, self._unfinished_tasks = state[-2:]
307*cda5da8dSAndroid Build Coastguard Worker
308*cda5da8dSAndroid Build Coastguard Worker    def put(self, obj, block=True, timeout=None):
309*cda5da8dSAndroid Build Coastguard Worker        if self._closed:
310*cda5da8dSAndroid Build Coastguard Worker            raise ValueError(f"Queue {self!r} is closed")
311*cda5da8dSAndroid Build Coastguard Worker        if not self._sem.acquire(block, timeout):
312*cda5da8dSAndroid Build Coastguard Worker            raise Full
313*cda5da8dSAndroid Build Coastguard Worker
314*cda5da8dSAndroid Build Coastguard Worker        with self._notempty, self._cond:
315*cda5da8dSAndroid Build Coastguard Worker            if self._thread is None:
316*cda5da8dSAndroid Build Coastguard Worker                self._start_thread()
317*cda5da8dSAndroid Build Coastguard Worker            self._buffer.append(obj)
318*cda5da8dSAndroid Build Coastguard Worker            self._unfinished_tasks.release()
319*cda5da8dSAndroid Build Coastguard Worker            self._notempty.notify()
320*cda5da8dSAndroid Build Coastguard Worker
321*cda5da8dSAndroid Build Coastguard Worker    def task_done(self):
322*cda5da8dSAndroid Build Coastguard Worker        with self._cond:
323*cda5da8dSAndroid Build Coastguard Worker            if not self._unfinished_tasks.acquire(False):
324*cda5da8dSAndroid Build Coastguard Worker                raise ValueError('task_done() called too many times')
325*cda5da8dSAndroid Build Coastguard Worker            if self._unfinished_tasks._semlock._is_zero():
326*cda5da8dSAndroid Build Coastguard Worker                self._cond.notify_all()
327*cda5da8dSAndroid Build Coastguard Worker
328*cda5da8dSAndroid Build Coastguard Worker    def join(self):
329*cda5da8dSAndroid Build Coastguard Worker        with self._cond:
330*cda5da8dSAndroid Build Coastguard Worker            if not self._unfinished_tasks._semlock._is_zero():
331*cda5da8dSAndroid Build Coastguard Worker                self._cond.wait()
332*cda5da8dSAndroid Build Coastguard Worker
333*cda5da8dSAndroid Build Coastguard Worker#
334*cda5da8dSAndroid Build Coastguard Worker# Simplified Queue type -- really just a locked pipe
335*cda5da8dSAndroid Build Coastguard Worker#
336*cda5da8dSAndroid Build Coastguard Worker
337*cda5da8dSAndroid Build Coastguard Workerclass SimpleQueue(object):
338*cda5da8dSAndroid Build Coastguard Worker
339*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, *, ctx):
340*cda5da8dSAndroid Build Coastguard Worker        self._reader, self._writer = connection.Pipe(duplex=False)
341*cda5da8dSAndroid Build Coastguard Worker        self._rlock = ctx.Lock()
342*cda5da8dSAndroid Build Coastguard Worker        self._poll = self._reader.poll
343*cda5da8dSAndroid Build Coastguard Worker        if sys.platform == 'win32':
344*cda5da8dSAndroid Build Coastguard Worker            self._wlock = None
345*cda5da8dSAndroid Build Coastguard Worker        else:
346*cda5da8dSAndroid Build Coastguard Worker            self._wlock = ctx.Lock()
347*cda5da8dSAndroid Build Coastguard Worker
348*cda5da8dSAndroid Build Coastguard Worker    def close(self):
349*cda5da8dSAndroid Build Coastguard Worker        self._reader.close()
350*cda5da8dSAndroid Build Coastguard Worker        self._writer.close()
351*cda5da8dSAndroid Build Coastguard Worker
352*cda5da8dSAndroid Build Coastguard Worker    def empty(self):
353*cda5da8dSAndroid Build Coastguard Worker        return not self._poll()
354*cda5da8dSAndroid Build Coastguard Worker
355*cda5da8dSAndroid Build Coastguard Worker    def __getstate__(self):
356*cda5da8dSAndroid Build Coastguard Worker        context.assert_spawning(self)
357*cda5da8dSAndroid Build Coastguard Worker        return (self._reader, self._writer, self._rlock, self._wlock)
358*cda5da8dSAndroid Build Coastguard Worker
359*cda5da8dSAndroid Build Coastguard Worker    def __setstate__(self, state):
360*cda5da8dSAndroid Build Coastguard Worker        (self._reader, self._writer, self._rlock, self._wlock) = state
361*cda5da8dSAndroid Build Coastguard Worker        self._poll = self._reader.poll
362*cda5da8dSAndroid Build Coastguard Worker
363*cda5da8dSAndroid Build Coastguard Worker    def get(self):
364*cda5da8dSAndroid Build Coastguard Worker        with self._rlock:
365*cda5da8dSAndroid Build Coastguard Worker            res = self._reader.recv_bytes()
366*cda5da8dSAndroid Build Coastguard Worker        # unserialize the data after having released the lock
367*cda5da8dSAndroid Build Coastguard Worker        return _ForkingPickler.loads(res)
368*cda5da8dSAndroid Build Coastguard Worker
369*cda5da8dSAndroid Build Coastguard Worker    def put(self, obj):
370*cda5da8dSAndroid Build Coastguard Worker        # serialize the data before acquiring the lock
371*cda5da8dSAndroid Build Coastguard Worker        obj = _ForkingPickler.dumps(obj)
372*cda5da8dSAndroid Build Coastguard Worker        if self._wlock is None:
373*cda5da8dSAndroid Build Coastguard Worker            # writes to a message oriented win32 pipe are atomic
374*cda5da8dSAndroid Build Coastguard Worker            self._writer.send_bytes(obj)
375*cda5da8dSAndroid Build Coastguard Worker        else:
376*cda5da8dSAndroid Build Coastguard Worker            with self._wlock:
377*cda5da8dSAndroid Build Coastguard Worker                self._writer.send_bytes(obj)
378*cda5da8dSAndroid Build Coastguard Worker
379*cda5da8dSAndroid Build Coastguard Worker    __class_getitem__ = classmethod(types.GenericAlias)
380