xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/multiprocessing/queues.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1#
2# Module implementing queues
3#
4# multiprocessing/queues.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
11
12import sys
13import os
14import threading
15import collections
16import time
17import types
18import weakref
19import errno
20
21from queue import Empty, Full
22
23import _multiprocessing
24
25from . import connection
26from . import context
27_ForkingPickler = context.reduction.ForkingPickler
28
29from .util import debug, info, Finalize, register_after_fork, is_exiting
30
31#
32# Queue type using a pipe, buffer and thread
33#
34
35class Queue(object):
36
37    def __init__(self, maxsize=0, *, ctx):
38        if maxsize <= 0:
39            # Can raise ImportError (see issues #3770 and #23400)
40            from .synchronize import SEM_VALUE_MAX as maxsize
41        self._maxsize = maxsize
42        self._reader, self._writer = connection.Pipe(duplex=False)
43        self._rlock = ctx.Lock()
44        self._opid = os.getpid()
45        if sys.platform == 'win32':
46            self._wlock = None
47        else:
48            self._wlock = ctx.Lock()
49        self._sem = ctx.BoundedSemaphore(maxsize)
50        # For use by concurrent.futures
51        self._ignore_epipe = False
52        self._reset()
53
54        if sys.platform != 'win32':
55            register_after_fork(self, Queue._after_fork)
56
57    def __getstate__(self):
58        context.assert_spawning(self)
59        return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
60                self._rlock, self._wlock, self._sem, self._opid)
61
62    def __setstate__(self, state):
63        (self._ignore_epipe, self._maxsize, self._reader, self._writer,
64         self._rlock, self._wlock, self._sem, self._opid) = state
65        self._reset()
66
67    def _after_fork(self):
68        debug('Queue._after_fork()')
69        self._reset(after_fork=True)
70
71    def _reset(self, after_fork=False):
72        if after_fork:
73            self._notempty._at_fork_reinit()
74        else:
75            self._notempty = threading.Condition(threading.Lock())
76        self._buffer = collections.deque()
77        self._thread = None
78        self._jointhread = None
79        self._joincancelled = False
80        self._closed = False
81        self._close = None
82        self._send_bytes = self._writer.send_bytes
83        self._recv_bytes = self._reader.recv_bytes
84        self._poll = self._reader.poll
85
86    def put(self, obj, block=True, timeout=None):
87        if self._closed:
88            raise ValueError(f"Queue {self!r} is closed")
89        if not self._sem.acquire(block, timeout):
90            raise Full
91
92        with self._notempty:
93            if self._thread is None:
94                self._start_thread()
95            self._buffer.append(obj)
96            self._notempty.notify()
97
98    def get(self, block=True, timeout=None):
99        if self._closed:
100            raise ValueError(f"Queue {self!r} is closed")
101        if block and timeout is None:
102            with self._rlock:
103                res = self._recv_bytes()
104            self._sem.release()
105        else:
106            if block:
107                deadline = time.monotonic() + timeout
108            if not self._rlock.acquire(block, timeout):
109                raise Empty
110            try:
111                if block:
112                    timeout = deadline - time.monotonic()
113                    if not self._poll(timeout):
114                        raise Empty
115                elif not self._poll():
116                    raise Empty
117                res = self._recv_bytes()
118                self._sem.release()
119            finally:
120                self._rlock.release()
121        # unserialize the data after having released the lock
122        return _ForkingPickler.loads(res)
123
124    def qsize(self):
125        # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
126        return self._maxsize - self._sem._semlock._get_value()
127
128    def empty(self):
129        return not self._poll()
130
131    def full(self):
132        return self._sem._semlock._is_zero()
133
134    def get_nowait(self):
135        return self.get(False)
136
137    def put_nowait(self, obj):
138        return self.put(obj, False)
139
140    def close(self):
141        self._closed = True
142        close = self._close
143        if close:
144            self._close = None
145            close()
146
147    def join_thread(self):
148        debug('Queue.join_thread()')
149        assert self._closed, "Queue {0!r} not closed".format(self)
150        if self._jointhread:
151            self._jointhread()
152
153    def cancel_join_thread(self):
154        debug('Queue.cancel_join_thread()')
155        self._joincancelled = True
156        try:
157            self._jointhread.cancel()
158        except AttributeError:
159            pass
160
161    def _start_thread(self):
162        debug('Queue._start_thread()')
163
164        # Start thread which transfers data from buffer to pipe
165        self._buffer.clear()
166        self._thread = threading.Thread(
167            target=Queue._feed,
168            args=(self._buffer, self._notempty, self._send_bytes,
169                  self._wlock, self._reader.close, self._writer.close,
170                  self._ignore_epipe, self._on_queue_feeder_error,
171                  self._sem),
172            name='QueueFeederThread'
173        )
174        self._thread.daemon = True
175
176        debug('doing self._thread.start()')
177        self._thread.start()
178        debug('... done self._thread.start()')
179
180        if not self._joincancelled:
181            self._jointhread = Finalize(
182                self._thread, Queue._finalize_join,
183                [weakref.ref(self._thread)],
184                exitpriority=-5
185                )
186
187        # Send sentinel to the thread queue object when garbage collected
188        self._close = Finalize(
189            self, Queue._finalize_close,
190            [self._buffer, self._notempty],
191            exitpriority=10
192            )
193
194    @staticmethod
195    def _finalize_join(twr):
196        debug('joining queue thread')
197        thread = twr()
198        if thread is not None:
199            thread.join()
200            debug('... queue thread joined')
201        else:
202            debug('... queue thread already dead')
203
204    @staticmethod
205    def _finalize_close(buffer, notempty):
206        debug('telling queue thread to quit')
207        with notempty:
208            buffer.append(_sentinel)
209            notempty.notify()
210
211    @staticmethod
212    def _feed(buffer, notempty, send_bytes, writelock, reader_close,
213              writer_close, ignore_epipe, onerror, queue_sem):
214        debug('starting thread to feed data to pipe')
215        nacquire = notempty.acquire
216        nrelease = notempty.release
217        nwait = notempty.wait
218        bpopleft = buffer.popleft
219        sentinel = _sentinel
220        if sys.platform != 'win32':
221            wacquire = writelock.acquire
222            wrelease = writelock.release
223        else:
224            wacquire = None
225
226        while 1:
227            try:
228                nacquire()
229                try:
230                    if not buffer:
231                        nwait()
232                finally:
233                    nrelease()
234                try:
235                    while 1:
236                        obj = bpopleft()
237                        if obj is sentinel:
238                            debug('feeder thread got sentinel -- exiting')
239                            reader_close()
240                            writer_close()
241                            return
242
243                        # serialize the data before acquiring the lock
244                        obj = _ForkingPickler.dumps(obj)
245                        if wacquire is None:
246                            send_bytes(obj)
247                        else:
248                            wacquire()
249                            try:
250                                send_bytes(obj)
251                            finally:
252                                wrelease()
253                except IndexError:
254                    pass
255            except Exception as e:
256                if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
257                    return
258                # Since this runs in a daemon thread the resources it uses
259                # may be become unusable while the process is cleaning up.
260                # We ignore errors which happen after the process has
261                # started to cleanup.
262                if is_exiting():
263                    info('error in queue thread: %s', e)
264                    return
265                else:
266                    # Since the object has not been sent in the queue, we need
267                    # to decrease the size of the queue. The error acts as
268                    # if the object had been silently removed from the queue
269                    # and this step is necessary to have a properly working
270                    # queue.
271                    queue_sem.release()
272                    onerror(e, obj)
273
274    @staticmethod
275    def _on_queue_feeder_error(e, obj):
276        """
277        Private API hook called when feeding data in the background thread
278        raises an exception.  For overriding by concurrent.futures.
279        """
280        import traceback
281        traceback.print_exc()
282
283
284_sentinel = object()
285
286#
287# A queue type which also supports join() and task_done() methods
288#
289# Note that if you do not call task_done() for each finished task then
290# eventually the counter's semaphore may overflow causing Bad Things
291# to happen.
292#
293
294class JoinableQueue(Queue):
295
296    def __init__(self, maxsize=0, *, ctx):
297        Queue.__init__(self, maxsize, ctx=ctx)
298        self._unfinished_tasks = ctx.Semaphore(0)
299        self._cond = ctx.Condition()
300
301    def __getstate__(self):
302        return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
303
304    def __setstate__(self, state):
305        Queue.__setstate__(self, state[:-2])
306        self._cond, self._unfinished_tasks = state[-2:]
307
308    def put(self, obj, block=True, timeout=None):
309        if self._closed:
310            raise ValueError(f"Queue {self!r} is closed")
311        if not self._sem.acquire(block, timeout):
312            raise Full
313
314        with self._notempty, self._cond:
315            if self._thread is None:
316                self._start_thread()
317            self._buffer.append(obj)
318            self._unfinished_tasks.release()
319            self._notempty.notify()
320
321    def task_done(self):
322        with self._cond:
323            if not self._unfinished_tasks.acquire(False):
324                raise ValueError('task_done() called too many times')
325            if self._unfinished_tasks._semlock._is_zero():
326                self._cond.notify_all()
327
328    def join(self):
329        with self._cond:
330            if not self._unfinished_tasks._semlock._is_zero():
331                self._cond.wait()
332
333#
334# Simplified Queue type -- really just a locked pipe
335#
336
337class SimpleQueue(object):
338
339    def __init__(self, *, ctx):
340        self._reader, self._writer = connection.Pipe(duplex=False)
341        self._rlock = ctx.Lock()
342        self._poll = self._reader.poll
343        if sys.platform == 'win32':
344            self._wlock = None
345        else:
346            self._wlock = ctx.Lock()
347
348    def close(self):
349        self._reader.close()
350        self._writer.close()
351
352    def empty(self):
353        return not self._poll()
354
355    def __getstate__(self):
356        context.assert_spawning(self)
357        return (self._reader, self._writer, self._rlock, self._wlock)
358
359    def __setstate__(self, state):
360        (self._reader, self._writer, self._rlock, self._wlock) = state
361        self._poll = self._reader.poll
362
363    def get(self):
364        with self._rlock:
365            res = self._reader.recv_bytes()
366        # unserialize the data after having released the lock
367        return _ForkingPickler.loads(res)
368
369    def put(self, obj):
370        # serialize the data before acquiring the lock
371        obj = _ForkingPickler.dumps(obj)
372        if self._wlock is None:
373            # writes to a message oriented win32 pipe are atomic
374            self._writer.send_bytes(obj)
375        else:
376            with self._wlock:
377                self._writer.send_bytes(obj)
378
379    __class_getitem__ = classmethod(types.GenericAlias)
380