1*cda5da8dSAndroid Build Coastguard Worker# 2*cda5da8dSAndroid Build Coastguard Worker# Module implementing queues 3*cda5da8dSAndroid Build Coastguard Worker# 4*cda5da8dSAndroid Build Coastguard Worker# multiprocessing/queues.py 5*cda5da8dSAndroid Build Coastguard Worker# 6*cda5da8dSAndroid Build Coastguard Worker# Copyright (c) 2006-2008, R Oudkerk 7*cda5da8dSAndroid Build Coastguard Worker# Licensed to PSF under a Contributor Agreement. 8*cda5da8dSAndroid Build Coastguard Worker# 9*cda5da8dSAndroid Build Coastguard Worker 10*cda5da8dSAndroid Build Coastguard Worker__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] 11*cda5da8dSAndroid Build Coastguard Worker 12*cda5da8dSAndroid Build Coastguard Workerimport sys 13*cda5da8dSAndroid Build Coastguard Workerimport os 14*cda5da8dSAndroid Build Coastguard Workerimport threading 15*cda5da8dSAndroid Build Coastguard Workerimport collections 16*cda5da8dSAndroid Build Coastguard Workerimport time 17*cda5da8dSAndroid Build Coastguard Workerimport types 18*cda5da8dSAndroid Build Coastguard Workerimport weakref 19*cda5da8dSAndroid Build Coastguard Workerimport errno 20*cda5da8dSAndroid Build Coastguard Worker 21*cda5da8dSAndroid Build Coastguard Workerfrom queue import Empty, Full 22*cda5da8dSAndroid Build Coastguard Worker 23*cda5da8dSAndroid Build Coastguard Workerimport _multiprocessing 24*cda5da8dSAndroid Build Coastguard Worker 25*cda5da8dSAndroid Build Coastguard Workerfrom . import connection 26*cda5da8dSAndroid Build Coastguard Workerfrom . import context 27*cda5da8dSAndroid Build Coastguard Worker_ForkingPickler = context.reduction.ForkingPickler 28*cda5da8dSAndroid Build Coastguard Worker 29*cda5da8dSAndroid Build Coastguard Workerfrom .util import debug, info, Finalize, register_after_fork, is_exiting 30*cda5da8dSAndroid Build Coastguard Worker 31*cda5da8dSAndroid Build Coastguard Worker# 32*cda5da8dSAndroid Build Coastguard Worker# Queue type using a pipe, buffer and thread 33*cda5da8dSAndroid Build Coastguard Worker# 34*cda5da8dSAndroid Build Coastguard Worker 35*cda5da8dSAndroid Build Coastguard Workerclass Queue(object): 36*cda5da8dSAndroid Build Coastguard Worker 37*cda5da8dSAndroid Build Coastguard Worker def __init__(self, maxsize=0, *, ctx): 38*cda5da8dSAndroid Build Coastguard Worker if maxsize <= 0: 39*cda5da8dSAndroid Build Coastguard Worker # Can raise ImportError (see issues #3770 and #23400) 40*cda5da8dSAndroid Build Coastguard Worker from .synchronize import SEM_VALUE_MAX as maxsize 41*cda5da8dSAndroid Build Coastguard Worker self._maxsize = maxsize 42*cda5da8dSAndroid Build Coastguard Worker self._reader, self._writer = connection.Pipe(duplex=False) 43*cda5da8dSAndroid Build Coastguard Worker self._rlock = ctx.Lock() 44*cda5da8dSAndroid Build Coastguard Worker self._opid = os.getpid() 45*cda5da8dSAndroid Build Coastguard Worker if sys.platform == 'win32': 46*cda5da8dSAndroid Build Coastguard Worker self._wlock = None 47*cda5da8dSAndroid Build Coastguard Worker else: 48*cda5da8dSAndroid Build Coastguard Worker self._wlock = ctx.Lock() 49*cda5da8dSAndroid Build Coastguard Worker self._sem = ctx.BoundedSemaphore(maxsize) 50*cda5da8dSAndroid Build Coastguard Worker # For use by concurrent.futures 51*cda5da8dSAndroid Build Coastguard Worker self._ignore_epipe = False 52*cda5da8dSAndroid Build Coastguard Worker self._reset() 53*cda5da8dSAndroid Build Coastguard Worker 54*cda5da8dSAndroid Build Coastguard Worker if sys.platform != 'win32': 55*cda5da8dSAndroid Build Coastguard Worker register_after_fork(self, Queue._after_fork) 56*cda5da8dSAndroid Build Coastguard Worker 57*cda5da8dSAndroid Build Coastguard Worker def __getstate__(self): 58*cda5da8dSAndroid Build Coastguard Worker context.assert_spawning(self) 59*cda5da8dSAndroid Build Coastguard Worker return (self._ignore_epipe, self._maxsize, self._reader, self._writer, 60*cda5da8dSAndroid Build Coastguard Worker self._rlock, self._wlock, self._sem, self._opid) 61*cda5da8dSAndroid Build Coastguard Worker 62*cda5da8dSAndroid Build Coastguard Worker def __setstate__(self, state): 63*cda5da8dSAndroid Build Coastguard Worker (self._ignore_epipe, self._maxsize, self._reader, self._writer, 64*cda5da8dSAndroid Build Coastguard Worker self._rlock, self._wlock, self._sem, self._opid) = state 65*cda5da8dSAndroid Build Coastguard Worker self._reset() 66*cda5da8dSAndroid Build Coastguard Worker 67*cda5da8dSAndroid Build Coastguard Worker def _after_fork(self): 68*cda5da8dSAndroid Build Coastguard Worker debug('Queue._after_fork()') 69*cda5da8dSAndroid Build Coastguard Worker self._reset(after_fork=True) 70*cda5da8dSAndroid Build Coastguard Worker 71*cda5da8dSAndroid Build Coastguard Worker def _reset(self, after_fork=False): 72*cda5da8dSAndroid Build Coastguard Worker if after_fork: 73*cda5da8dSAndroid Build Coastguard Worker self._notempty._at_fork_reinit() 74*cda5da8dSAndroid Build Coastguard Worker else: 75*cda5da8dSAndroid Build Coastguard Worker self._notempty = threading.Condition(threading.Lock()) 76*cda5da8dSAndroid Build Coastguard Worker self._buffer = collections.deque() 77*cda5da8dSAndroid Build Coastguard Worker self._thread = None 78*cda5da8dSAndroid Build Coastguard Worker self._jointhread = None 79*cda5da8dSAndroid Build Coastguard Worker self._joincancelled = False 80*cda5da8dSAndroid Build Coastguard Worker self._closed = False 81*cda5da8dSAndroid Build Coastguard Worker self._close = None 82*cda5da8dSAndroid Build Coastguard Worker self._send_bytes = self._writer.send_bytes 83*cda5da8dSAndroid Build Coastguard Worker self._recv_bytes = self._reader.recv_bytes 84*cda5da8dSAndroid Build Coastguard Worker self._poll = self._reader.poll 85*cda5da8dSAndroid Build Coastguard Worker 86*cda5da8dSAndroid Build Coastguard Worker def put(self, obj, block=True, timeout=None): 87*cda5da8dSAndroid Build Coastguard Worker if self._closed: 88*cda5da8dSAndroid Build Coastguard Worker raise ValueError(f"Queue {self!r} is closed") 89*cda5da8dSAndroid Build Coastguard Worker if not self._sem.acquire(block, timeout): 90*cda5da8dSAndroid Build Coastguard Worker raise Full 91*cda5da8dSAndroid Build Coastguard Worker 92*cda5da8dSAndroid Build Coastguard Worker with self._notempty: 93*cda5da8dSAndroid Build Coastguard Worker if self._thread is None: 94*cda5da8dSAndroid Build Coastguard Worker self._start_thread() 95*cda5da8dSAndroid Build Coastguard Worker self._buffer.append(obj) 96*cda5da8dSAndroid Build Coastguard Worker self._notempty.notify() 97*cda5da8dSAndroid Build Coastguard Worker 98*cda5da8dSAndroid Build Coastguard Worker def get(self, block=True, timeout=None): 99*cda5da8dSAndroid Build Coastguard Worker if self._closed: 100*cda5da8dSAndroid Build Coastguard Worker raise ValueError(f"Queue {self!r} is closed") 101*cda5da8dSAndroid Build Coastguard Worker if block and timeout is None: 102*cda5da8dSAndroid Build Coastguard Worker with self._rlock: 103*cda5da8dSAndroid Build Coastguard Worker res = self._recv_bytes() 104*cda5da8dSAndroid Build Coastguard Worker self._sem.release() 105*cda5da8dSAndroid Build Coastguard Worker else: 106*cda5da8dSAndroid Build Coastguard Worker if block: 107*cda5da8dSAndroid Build Coastguard Worker deadline = time.monotonic() + timeout 108*cda5da8dSAndroid Build Coastguard Worker if not self._rlock.acquire(block, timeout): 109*cda5da8dSAndroid Build Coastguard Worker raise Empty 110*cda5da8dSAndroid Build Coastguard Worker try: 111*cda5da8dSAndroid Build Coastguard Worker if block: 112*cda5da8dSAndroid Build Coastguard Worker timeout = deadline - time.monotonic() 113*cda5da8dSAndroid Build Coastguard Worker if not self._poll(timeout): 114*cda5da8dSAndroid Build Coastguard Worker raise Empty 115*cda5da8dSAndroid Build Coastguard Worker elif not self._poll(): 116*cda5da8dSAndroid Build Coastguard Worker raise Empty 117*cda5da8dSAndroid Build Coastguard Worker res = self._recv_bytes() 118*cda5da8dSAndroid Build Coastguard Worker self._sem.release() 119*cda5da8dSAndroid Build Coastguard Worker finally: 120*cda5da8dSAndroid Build Coastguard Worker self._rlock.release() 121*cda5da8dSAndroid Build Coastguard Worker # unserialize the data after having released the lock 122*cda5da8dSAndroid Build Coastguard Worker return _ForkingPickler.loads(res) 123*cda5da8dSAndroid Build Coastguard Worker 124*cda5da8dSAndroid Build Coastguard Worker def qsize(self): 125*cda5da8dSAndroid Build Coastguard Worker # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() 126*cda5da8dSAndroid Build Coastguard Worker return self._maxsize - self._sem._semlock._get_value() 127*cda5da8dSAndroid Build Coastguard Worker 128*cda5da8dSAndroid Build Coastguard Worker def empty(self): 129*cda5da8dSAndroid Build Coastguard Worker return not self._poll() 130*cda5da8dSAndroid Build Coastguard Worker 131*cda5da8dSAndroid Build Coastguard Worker def full(self): 132*cda5da8dSAndroid Build Coastguard Worker return self._sem._semlock._is_zero() 133*cda5da8dSAndroid Build Coastguard Worker 134*cda5da8dSAndroid Build Coastguard Worker def get_nowait(self): 135*cda5da8dSAndroid Build Coastguard Worker return self.get(False) 136*cda5da8dSAndroid Build Coastguard Worker 137*cda5da8dSAndroid Build Coastguard Worker def put_nowait(self, obj): 138*cda5da8dSAndroid Build Coastguard Worker return self.put(obj, False) 139*cda5da8dSAndroid Build Coastguard Worker 140*cda5da8dSAndroid Build Coastguard Worker def close(self): 141*cda5da8dSAndroid Build Coastguard Worker self._closed = True 142*cda5da8dSAndroid Build Coastguard Worker close = self._close 143*cda5da8dSAndroid Build Coastguard Worker if close: 144*cda5da8dSAndroid Build Coastguard Worker self._close = None 145*cda5da8dSAndroid Build Coastguard Worker close() 146*cda5da8dSAndroid Build Coastguard Worker 147*cda5da8dSAndroid Build Coastguard Worker def join_thread(self): 148*cda5da8dSAndroid Build Coastguard Worker debug('Queue.join_thread()') 149*cda5da8dSAndroid Build Coastguard Worker assert self._closed, "Queue {0!r} not closed".format(self) 150*cda5da8dSAndroid Build Coastguard Worker if self._jointhread: 151*cda5da8dSAndroid Build Coastguard Worker self._jointhread() 152*cda5da8dSAndroid Build Coastguard Worker 153*cda5da8dSAndroid Build Coastguard Worker def cancel_join_thread(self): 154*cda5da8dSAndroid Build Coastguard Worker debug('Queue.cancel_join_thread()') 155*cda5da8dSAndroid Build Coastguard Worker self._joincancelled = True 156*cda5da8dSAndroid Build Coastguard Worker try: 157*cda5da8dSAndroid Build Coastguard Worker self._jointhread.cancel() 158*cda5da8dSAndroid Build Coastguard Worker except AttributeError: 159*cda5da8dSAndroid Build Coastguard Worker pass 160*cda5da8dSAndroid Build Coastguard Worker 161*cda5da8dSAndroid Build Coastguard Worker def _start_thread(self): 162*cda5da8dSAndroid Build Coastguard Worker debug('Queue._start_thread()') 163*cda5da8dSAndroid Build Coastguard Worker 164*cda5da8dSAndroid Build Coastguard Worker # Start thread which transfers data from buffer to pipe 165*cda5da8dSAndroid Build Coastguard Worker self._buffer.clear() 166*cda5da8dSAndroid Build Coastguard Worker self._thread = threading.Thread( 167*cda5da8dSAndroid Build Coastguard Worker target=Queue._feed, 168*cda5da8dSAndroid Build Coastguard Worker args=(self._buffer, self._notempty, self._send_bytes, 169*cda5da8dSAndroid Build Coastguard Worker self._wlock, self._reader.close, self._writer.close, 170*cda5da8dSAndroid Build Coastguard Worker self._ignore_epipe, self._on_queue_feeder_error, 171*cda5da8dSAndroid Build Coastguard Worker self._sem), 172*cda5da8dSAndroid Build Coastguard Worker name='QueueFeederThread' 173*cda5da8dSAndroid Build Coastguard Worker ) 174*cda5da8dSAndroid Build Coastguard Worker self._thread.daemon = True 175*cda5da8dSAndroid Build Coastguard Worker 176*cda5da8dSAndroid Build Coastguard Worker debug('doing self._thread.start()') 177*cda5da8dSAndroid Build Coastguard Worker self._thread.start() 178*cda5da8dSAndroid Build Coastguard Worker debug('... done self._thread.start()') 179*cda5da8dSAndroid Build Coastguard Worker 180*cda5da8dSAndroid Build Coastguard Worker if not self._joincancelled: 181*cda5da8dSAndroid Build Coastguard Worker self._jointhread = Finalize( 182*cda5da8dSAndroid Build Coastguard Worker self._thread, Queue._finalize_join, 183*cda5da8dSAndroid Build Coastguard Worker [weakref.ref(self._thread)], 184*cda5da8dSAndroid Build Coastguard Worker exitpriority=-5 185*cda5da8dSAndroid Build Coastguard Worker ) 186*cda5da8dSAndroid Build Coastguard Worker 187*cda5da8dSAndroid Build Coastguard Worker # Send sentinel to the thread queue object when garbage collected 188*cda5da8dSAndroid Build Coastguard Worker self._close = Finalize( 189*cda5da8dSAndroid Build Coastguard Worker self, Queue._finalize_close, 190*cda5da8dSAndroid Build Coastguard Worker [self._buffer, self._notempty], 191*cda5da8dSAndroid Build Coastguard Worker exitpriority=10 192*cda5da8dSAndroid Build Coastguard Worker ) 193*cda5da8dSAndroid Build Coastguard Worker 194*cda5da8dSAndroid Build Coastguard Worker @staticmethod 195*cda5da8dSAndroid Build Coastguard Worker def _finalize_join(twr): 196*cda5da8dSAndroid Build Coastguard Worker debug('joining queue thread') 197*cda5da8dSAndroid Build Coastguard Worker thread = twr() 198*cda5da8dSAndroid Build Coastguard Worker if thread is not None: 199*cda5da8dSAndroid Build Coastguard Worker thread.join() 200*cda5da8dSAndroid Build Coastguard Worker debug('... queue thread joined') 201*cda5da8dSAndroid Build Coastguard Worker else: 202*cda5da8dSAndroid Build Coastguard Worker debug('... queue thread already dead') 203*cda5da8dSAndroid Build Coastguard Worker 204*cda5da8dSAndroid Build Coastguard Worker @staticmethod 205*cda5da8dSAndroid Build Coastguard Worker def _finalize_close(buffer, notempty): 206*cda5da8dSAndroid Build Coastguard Worker debug('telling queue thread to quit') 207*cda5da8dSAndroid Build Coastguard Worker with notempty: 208*cda5da8dSAndroid Build Coastguard Worker buffer.append(_sentinel) 209*cda5da8dSAndroid Build Coastguard Worker notempty.notify() 210*cda5da8dSAndroid Build Coastguard Worker 211*cda5da8dSAndroid Build Coastguard Worker @staticmethod 212*cda5da8dSAndroid Build Coastguard Worker def _feed(buffer, notempty, send_bytes, writelock, reader_close, 213*cda5da8dSAndroid Build Coastguard Worker writer_close, ignore_epipe, onerror, queue_sem): 214*cda5da8dSAndroid Build Coastguard Worker debug('starting thread to feed data to pipe') 215*cda5da8dSAndroid Build Coastguard Worker nacquire = notempty.acquire 216*cda5da8dSAndroid Build Coastguard Worker nrelease = notempty.release 217*cda5da8dSAndroid Build Coastguard Worker nwait = notempty.wait 218*cda5da8dSAndroid Build Coastguard Worker bpopleft = buffer.popleft 219*cda5da8dSAndroid Build Coastguard Worker sentinel = _sentinel 220*cda5da8dSAndroid Build Coastguard Worker if sys.platform != 'win32': 221*cda5da8dSAndroid Build Coastguard Worker wacquire = writelock.acquire 222*cda5da8dSAndroid Build Coastguard Worker wrelease = writelock.release 223*cda5da8dSAndroid Build Coastguard Worker else: 224*cda5da8dSAndroid Build Coastguard Worker wacquire = None 225*cda5da8dSAndroid Build Coastguard Worker 226*cda5da8dSAndroid Build Coastguard Worker while 1: 227*cda5da8dSAndroid Build Coastguard Worker try: 228*cda5da8dSAndroid Build Coastguard Worker nacquire() 229*cda5da8dSAndroid Build Coastguard Worker try: 230*cda5da8dSAndroid Build Coastguard Worker if not buffer: 231*cda5da8dSAndroid Build Coastguard Worker nwait() 232*cda5da8dSAndroid Build Coastguard Worker finally: 233*cda5da8dSAndroid Build Coastguard Worker nrelease() 234*cda5da8dSAndroid Build Coastguard Worker try: 235*cda5da8dSAndroid Build Coastguard Worker while 1: 236*cda5da8dSAndroid Build Coastguard Worker obj = bpopleft() 237*cda5da8dSAndroid Build Coastguard Worker if obj is sentinel: 238*cda5da8dSAndroid Build Coastguard Worker debug('feeder thread got sentinel -- exiting') 239*cda5da8dSAndroid Build Coastguard Worker reader_close() 240*cda5da8dSAndroid Build Coastguard Worker writer_close() 241*cda5da8dSAndroid Build Coastguard Worker return 242*cda5da8dSAndroid Build Coastguard Worker 243*cda5da8dSAndroid Build Coastguard Worker # serialize the data before acquiring the lock 244*cda5da8dSAndroid Build Coastguard Worker obj = _ForkingPickler.dumps(obj) 245*cda5da8dSAndroid Build Coastguard Worker if wacquire is None: 246*cda5da8dSAndroid Build Coastguard Worker send_bytes(obj) 247*cda5da8dSAndroid Build Coastguard Worker else: 248*cda5da8dSAndroid Build Coastguard Worker wacquire() 249*cda5da8dSAndroid Build Coastguard Worker try: 250*cda5da8dSAndroid Build Coastguard Worker send_bytes(obj) 251*cda5da8dSAndroid Build Coastguard Worker finally: 252*cda5da8dSAndroid Build Coastguard Worker wrelease() 253*cda5da8dSAndroid Build Coastguard Worker except IndexError: 254*cda5da8dSAndroid Build Coastguard Worker pass 255*cda5da8dSAndroid Build Coastguard Worker except Exception as e: 256*cda5da8dSAndroid Build Coastguard Worker if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: 257*cda5da8dSAndroid Build Coastguard Worker return 258*cda5da8dSAndroid Build Coastguard Worker # Since this runs in a daemon thread the resources it uses 259*cda5da8dSAndroid Build Coastguard Worker # may be become unusable while the process is cleaning up. 260*cda5da8dSAndroid Build Coastguard Worker # We ignore errors which happen after the process has 261*cda5da8dSAndroid Build Coastguard Worker # started to cleanup. 262*cda5da8dSAndroid Build Coastguard Worker if is_exiting(): 263*cda5da8dSAndroid Build Coastguard Worker info('error in queue thread: %s', e) 264*cda5da8dSAndroid Build Coastguard Worker return 265*cda5da8dSAndroid Build Coastguard Worker else: 266*cda5da8dSAndroid Build Coastguard Worker # Since the object has not been sent in the queue, we need 267*cda5da8dSAndroid Build Coastguard Worker # to decrease the size of the queue. The error acts as 268*cda5da8dSAndroid Build Coastguard Worker # if the object had been silently removed from the queue 269*cda5da8dSAndroid Build Coastguard Worker # and this step is necessary to have a properly working 270*cda5da8dSAndroid Build Coastguard Worker # queue. 271*cda5da8dSAndroid Build Coastguard Worker queue_sem.release() 272*cda5da8dSAndroid Build Coastguard Worker onerror(e, obj) 273*cda5da8dSAndroid Build Coastguard Worker 274*cda5da8dSAndroid Build Coastguard Worker @staticmethod 275*cda5da8dSAndroid Build Coastguard Worker def _on_queue_feeder_error(e, obj): 276*cda5da8dSAndroid Build Coastguard Worker """ 277*cda5da8dSAndroid Build Coastguard Worker Private API hook called when feeding data in the background thread 278*cda5da8dSAndroid Build Coastguard Worker raises an exception. For overriding by concurrent.futures. 279*cda5da8dSAndroid Build Coastguard Worker """ 280*cda5da8dSAndroid Build Coastguard Worker import traceback 281*cda5da8dSAndroid Build Coastguard Worker traceback.print_exc() 282*cda5da8dSAndroid Build Coastguard Worker 283*cda5da8dSAndroid Build Coastguard Worker 284*cda5da8dSAndroid Build Coastguard Worker_sentinel = object() 285*cda5da8dSAndroid Build Coastguard Worker 286*cda5da8dSAndroid Build Coastguard Worker# 287*cda5da8dSAndroid Build Coastguard Worker# A queue type which also supports join() and task_done() methods 288*cda5da8dSAndroid Build Coastguard Worker# 289*cda5da8dSAndroid Build Coastguard Worker# Note that if you do not call task_done() for each finished task then 290*cda5da8dSAndroid Build Coastguard Worker# eventually the counter's semaphore may overflow causing Bad Things 291*cda5da8dSAndroid Build Coastguard Worker# to happen. 292*cda5da8dSAndroid Build Coastguard Worker# 293*cda5da8dSAndroid Build Coastguard Worker 294*cda5da8dSAndroid Build Coastguard Workerclass JoinableQueue(Queue): 295*cda5da8dSAndroid Build Coastguard Worker 296*cda5da8dSAndroid Build Coastguard Worker def __init__(self, maxsize=0, *, ctx): 297*cda5da8dSAndroid Build Coastguard Worker Queue.__init__(self, maxsize, ctx=ctx) 298*cda5da8dSAndroid Build Coastguard Worker self._unfinished_tasks = ctx.Semaphore(0) 299*cda5da8dSAndroid Build Coastguard Worker self._cond = ctx.Condition() 300*cda5da8dSAndroid Build Coastguard Worker 301*cda5da8dSAndroid Build Coastguard Worker def __getstate__(self): 302*cda5da8dSAndroid Build Coastguard Worker return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) 303*cda5da8dSAndroid Build Coastguard Worker 304*cda5da8dSAndroid Build Coastguard Worker def __setstate__(self, state): 305*cda5da8dSAndroid Build Coastguard Worker Queue.__setstate__(self, state[:-2]) 306*cda5da8dSAndroid Build Coastguard Worker self._cond, self._unfinished_tasks = state[-2:] 307*cda5da8dSAndroid Build Coastguard Worker 308*cda5da8dSAndroid Build Coastguard Worker def put(self, obj, block=True, timeout=None): 309*cda5da8dSAndroid Build Coastguard Worker if self._closed: 310*cda5da8dSAndroid Build Coastguard Worker raise ValueError(f"Queue {self!r} is closed") 311*cda5da8dSAndroid Build Coastguard Worker if not self._sem.acquire(block, timeout): 312*cda5da8dSAndroid Build Coastguard Worker raise Full 313*cda5da8dSAndroid Build Coastguard Worker 314*cda5da8dSAndroid Build Coastguard Worker with self._notempty, self._cond: 315*cda5da8dSAndroid Build Coastguard Worker if self._thread is None: 316*cda5da8dSAndroid Build Coastguard Worker self._start_thread() 317*cda5da8dSAndroid Build Coastguard Worker self._buffer.append(obj) 318*cda5da8dSAndroid Build Coastguard Worker self._unfinished_tasks.release() 319*cda5da8dSAndroid Build Coastguard Worker self._notempty.notify() 320*cda5da8dSAndroid Build Coastguard Worker 321*cda5da8dSAndroid Build Coastguard Worker def task_done(self): 322*cda5da8dSAndroid Build Coastguard Worker with self._cond: 323*cda5da8dSAndroid Build Coastguard Worker if not self._unfinished_tasks.acquire(False): 324*cda5da8dSAndroid Build Coastguard Worker raise ValueError('task_done() called too many times') 325*cda5da8dSAndroid Build Coastguard Worker if self._unfinished_tasks._semlock._is_zero(): 326*cda5da8dSAndroid Build Coastguard Worker self._cond.notify_all() 327*cda5da8dSAndroid Build Coastguard Worker 328*cda5da8dSAndroid Build Coastguard Worker def join(self): 329*cda5da8dSAndroid Build Coastguard Worker with self._cond: 330*cda5da8dSAndroid Build Coastguard Worker if not self._unfinished_tasks._semlock._is_zero(): 331*cda5da8dSAndroid Build Coastguard Worker self._cond.wait() 332*cda5da8dSAndroid Build Coastguard Worker 333*cda5da8dSAndroid Build Coastguard Worker# 334*cda5da8dSAndroid Build Coastguard Worker# Simplified Queue type -- really just a locked pipe 335*cda5da8dSAndroid Build Coastguard Worker# 336*cda5da8dSAndroid Build Coastguard Worker 337*cda5da8dSAndroid Build Coastguard Workerclass SimpleQueue(object): 338*cda5da8dSAndroid Build Coastguard Worker 339*cda5da8dSAndroid Build Coastguard Worker def __init__(self, *, ctx): 340*cda5da8dSAndroid Build Coastguard Worker self._reader, self._writer = connection.Pipe(duplex=False) 341*cda5da8dSAndroid Build Coastguard Worker self._rlock = ctx.Lock() 342*cda5da8dSAndroid Build Coastguard Worker self._poll = self._reader.poll 343*cda5da8dSAndroid Build Coastguard Worker if sys.platform == 'win32': 344*cda5da8dSAndroid Build Coastguard Worker self._wlock = None 345*cda5da8dSAndroid Build Coastguard Worker else: 346*cda5da8dSAndroid Build Coastguard Worker self._wlock = ctx.Lock() 347*cda5da8dSAndroid Build Coastguard Worker 348*cda5da8dSAndroid Build Coastguard Worker def close(self): 349*cda5da8dSAndroid Build Coastguard Worker self._reader.close() 350*cda5da8dSAndroid Build Coastguard Worker self._writer.close() 351*cda5da8dSAndroid Build Coastguard Worker 352*cda5da8dSAndroid Build Coastguard Worker def empty(self): 353*cda5da8dSAndroid Build Coastguard Worker return not self._poll() 354*cda5da8dSAndroid Build Coastguard Worker 355*cda5da8dSAndroid Build Coastguard Worker def __getstate__(self): 356*cda5da8dSAndroid Build Coastguard Worker context.assert_spawning(self) 357*cda5da8dSAndroid Build Coastguard Worker return (self._reader, self._writer, self._rlock, self._wlock) 358*cda5da8dSAndroid Build Coastguard Worker 359*cda5da8dSAndroid Build Coastguard Worker def __setstate__(self, state): 360*cda5da8dSAndroid Build Coastguard Worker (self._reader, self._writer, self._rlock, self._wlock) = state 361*cda5da8dSAndroid Build Coastguard Worker self._poll = self._reader.poll 362*cda5da8dSAndroid Build Coastguard Worker 363*cda5da8dSAndroid Build Coastguard Worker def get(self): 364*cda5da8dSAndroid Build Coastguard Worker with self._rlock: 365*cda5da8dSAndroid Build Coastguard Worker res = self._reader.recv_bytes() 366*cda5da8dSAndroid Build Coastguard Worker # unserialize the data after having released the lock 367*cda5da8dSAndroid Build Coastguard Worker return _ForkingPickler.loads(res) 368*cda5da8dSAndroid Build Coastguard Worker 369*cda5da8dSAndroid Build Coastguard Worker def put(self, obj): 370*cda5da8dSAndroid Build Coastguard Worker # serialize the data before acquiring the lock 371*cda5da8dSAndroid Build Coastguard Worker obj = _ForkingPickler.dumps(obj) 372*cda5da8dSAndroid Build Coastguard Worker if self._wlock is None: 373*cda5da8dSAndroid Build Coastguard Worker # writes to a message oriented win32 pipe are atomic 374*cda5da8dSAndroid Build Coastguard Worker self._writer.send_bytes(obj) 375*cda5da8dSAndroid Build Coastguard Worker else: 376*cda5da8dSAndroid Build Coastguard Worker with self._wlock: 377*cda5da8dSAndroid Build Coastguard Worker self._writer.send_bytes(obj) 378*cda5da8dSAndroid Build Coastguard Worker 379*cda5da8dSAndroid Build Coastguard Worker __class_getitem__ = classmethod(types.GenericAlias) 380