1# 2# Module implementing queues 3# 4# multiprocessing/queues.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] 11 12import sys 13import os 14import threading 15import collections 16import time 17import types 18import weakref 19import errno 20 21from queue import Empty, Full 22 23import _multiprocessing 24 25from . import connection 26from . import context 27_ForkingPickler = context.reduction.ForkingPickler 28 29from .util import debug, info, Finalize, register_after_fork, is_exiting 30 31# 32# Queue type using a pipe, buffer and thread 33# 34 35class Queue(object): 36 37 def __init__(self, maxsize=0, *, ctx): 38 if maxsize <= 0: 39 # Can raise ImportError (see issues #3770 and #23400) 40 from .synchronize import SEM_VALUE_MAX as maxsize 41 self._maxsize = maxsize 42 self._reader, self._writer = connection.Pipe(duplex=False) 43 self._rlock = ctx.Lock() 44 self._opid = os.getpid() 45 if sys.platform == 'win32': 46 self._wlock = None 47 else: 48 self._wlock = ctx.Lock() 49 self._sem = ctx.BoundedSemaphore(maxsize) 50 # For use by concurrent.futures 51 self._ignore_epipe = False 52 self._reset() 53 54 if sys.platform != 'win32': 55 register_after_fork(self, Queue._after_fork) 56 57 def __getstate__(self): 58 context.assert_spawning(self) 59 return (self._ignore_epipe, self._maxsize, self._reader, self._writer, 60 self._rlock, self._wlock, self._sem, self._opid) 61 62 def __setstate__(self, state): 63 (self._ignore_epipe, self._maxsize, self._reader, self._writer, 64 self._rlock, self._wlock, self._sem, self._opid) = state 65 self._reset() 66 67 def _after_fork(self): 68 debug('Queue._after_fork()') 69 self._reset(after_fork=True) 70 71 def _reset(self, after_fork=False): 72 if after_fork: 73 self._notempty._at_fork_reinit() 74 else: 75 self._notempty = threading.Condition(threading.Lock()) 76 self._buffer = collections.deque() 77 self._thread = None 78 self._jointhread = None 79 self._joincancelled = False 80 self._closed = False 81 self._close = None 82 self._send_bytes = self._writer.send_bytes 83 self._recv_bytes = self._reader.recv_bytes 84 self._poll = self._reader.poll 85 86 def put(self, obj, block=True, timeout=None): 87 if self._closed: 88 raise ValueError(f"Queue {self!r} is closed") 89 if not self._sem.acquire(block, timeout): 90 raise Full 91 92 with self._notempty: 93 if self._thread is None: 94 self._start_thread() 95 self._buffer.append(obj) 96 self._notempty.notify() 97 98 def get(self, block=True, timeout=None): 99 if self._closed: 100 raise ValueError(f"Queue {self!r} is closed") 101 if block and timeout is None: 102 with self._rlock: 103 res = self._recv_bytes() 104 self._sem.release() 105 else: 106 if block: 107 deadline = time.monotonic() + timeout 108 if not self._rlock.acquire(block, timeout): 109 raise Empty 110 try: 111 if block: 112 timeout = deadline - time.monotonic() 113 if not self._poll(timeout): 114 raise Empty 115 elif not self._poll(): 116 raise Empty 117 res = self._recv_bytes() 118 self._sem.release() 119 finally: 120 self._rlock.release() 121 # unserialize the data after having released the lock 122 return _ForkingPickler.loads(res) 123 124 def qsize(self): 125 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() 126 return self._maxsize - self._sem._semlock._get_value() 127 128 def empty(self): 129 return not self._poll() 130 131 def full(self): 132 return self._sem._semlock._is_zero() 133 134 def get_nowait(self): 135 return self.get(False) 136 137 def put_nowait(self, obj): 138 return self.put(obj, False) 139 140 def close(self): 141 self._closed = True 142 close = self._close 143 if close: 144 self._close = None 145 close() 146 147 def join_thread(self): 148 debug('Queue.join_thread()') 149 assert self._closed, "Queue {0!r} not closed".format(self) 150 if self._jointhread: 151 self._jointhread() 152 153 def cancel_join_thread(self): 154 debug('Queue.cancel_join_thread()') 155 self._joincancelled = True 156 try: 157 self._jointhread.cancel() 158 except AttributeError: 159 pass 160 161 def _start_thread(self): 162 debug('Queue._start_thread()') 163 164 # Start thread which transfers data from buffer to pipe 165 self._buffer.clear() 166 self._thread = threading.Thread( 167 target=Queue._feed, 168 args=(self._buffer, self._notempty, self._send_bytes, 169 self._wlock, self._reader.close, self._writer.close, 170 self._ignore_epipe, self._on_queue_feeder_error, 171 self._sem), 172 name='QueueFeederThread' 173 ) 174 self._thread.daemon = True 175 176 debug('doing self._thread.start()') 177 self._thread.start() 178 debug('... done self._thread.start()') 179 180 if not self._joincancelled: 181 self._jointhread = Finalize( 182 self._thread, Queue._finalize_join, 183 [weakref.ref(self._thread)], 184 exitpriority=-5 185 ) 186 187 # Send sentinel to the thread queue object when garbage collected 188 self._close = Finalize( 189 self, Queue._finalize_close, 190 [self._buffer, self._notempty], 191 exitpriority=10 192 ) 193 194 @staticmethod 195 def _finalize_join(twr): 196 debug('joining queue thread') 197 thread = twr() 198 if thread is not None: 199 thread.join() 200 debug('... queue thread joined') 201 else: 202 debug('... queue thread already dead') 203 204 @staticmethod 205 def _finalize_close(buffer, notempty): 206 debug('telling queue thread to quit') 207 with notempty: 208 buffer.append(_sentinel) 209 notempty.notify() 210 211 @staticmethod 212 def _feed(buffer, notempty, send_bytes, writelock, reader_close, 213 writer_close, ignore_epipe, onerror, queue_sem): 214 debug('starting thread to feed data to pipe') 215 nacquire = notempty.acquire 216 nrelease = notempty.release 217 nwait = notempty.wait 218 bpopleft = buffer.popleft 219 sentinel = _sentinel 220 if sys.platform != 'win32': 221 wacquire = writelock.acquire 222 wrelease = writelock.release 223 else: 224 wacquire = None 225 226 while 1: 227 try: 228 nacquire() 229 try: 230 if not buffer: 231 nwait() 232 finally: 233 nrelease() 234 try: 235 while 1: 236 obj = bpopleft() 237 if obj is sentinel: 238 debug('feeder thread got sentinel -- exiting') 239 reader_close() 240 writer_close() 241 return 242 243 # serialize the data before acquiring the lock 244 obj = _ForkingPickler.dumps(obj) 245 if wacquire is None: 246 send_bytes(obj) 247 else: 248 wacquire() 249 try: 250 send_bytes(obj) 251 finally: 252 wrelease() 253 except IndexError: 254 pass 255 except Exception as e: 256 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: 257 return 258 # Since this runs in a daemon thread the resources it uses 259 # may be become unusable while the process is cleaning up. 260 # We ignore errors which happen after the process has 261 # started to cleanup. 262 if is_exiting(): 263 info('error in queue thread: %s', e) 264 return 265 else: 266 # Since the object has not been sent in the queue, we need 267 # to decrease the size of the queue. The error acts as 268 # if the object had been silently removed from the queue 269 # and this step is necessary to have a properly working 270 # queue. 271 queue_sem.release() 272 onerror(e, obj) 273 274 @staticmethod 275 def _on_queue_feeder_error(e, obj): 276 """ 277 Private API hook called when feeding data in the background thread 278 raises an exception. For overriding by concurrent.futures. 279 """ 280 import traceback 281 traceback.print_exc() 282 283 284_sentinel = object() 285 286# 287# A queue type which also supports join() and task_done() methods 288# 289# Note that if you do not call task_done() for each finished task then 290# eventually the counter's semaphore may overflow causing Bad Things 291# to happen. 292# 293 294class JoinableQueue(Queue): 295 296 def __init__(self, maxsize=0, *, ctx): 297 Queue.__init__(self, maxsize, ctx=ctx) 298 self._unfinished_tasks = ctx.Semaphore(0) 299 self._cond = ctx.Condition() 300 301 def __getstate__(self): 302 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) 303 304 def __setstate__(self, state): 305 Queue.__setstate__(self, state[:-2]) 306 self._cond, self._unfinished_tasks = state[-2:] 307 308 def put(self, obj, block=True, timeout=None): 309 if self._closed: 310 raise ValueError(f"Queue {self!r} is closed") 311 if not self._sem.acquire(block, timeout): 312 raise Full 313 314 with self._notempty, self._cond: 315 if self._thread is None: 316 self._start_thread() 317 self._buffer.append(obj) 318 self._unfinished_tasks.release() 319 self._notempty.notify() 320 321 def task_done(self): 322 with self._cond: 323 if not self._unfinished_tasks.acquire(False): 324 raise ValueError('task_done() called too many times') 325 if self._unfinished_tasks._semlock._is_zero(): 326 self._cond.notify_all() 327 328 def join(self): 329 with self._cond: 330 if not self._unfinished_tasks._semlock._is_zero(): 331 self._cond.wait() 332 333# 334# Simplified Queue type -- really just a locked pipe 335# 336 337class SimpleQueue(object): 338 339 def __init__(self, *, ctx): 340 self._reader, self._writer = connection.Pipe(duplex=False) 341 self._rlock = ctx.Lock() 342 self._poll = self._reader.poll 343 if sys.platform == 'win32': 344 self._wlock = None 345 else: 346 self._wlock = ctx.Lock() 347 348 def close(self): 349 self._reader.close() 350 self._writer.close() 351 352 def empty(self): 353 return not self._poll() 354 355 def __getstate__(self): 356 context.assert_spawning(self) 357 return (self._reader, self._writer, self._rlock, self._wlock) 358 359 def __setstate__(self, state): 360 (self._reader, self._writer, self._rlock, self._wlock) = state 361 self._poll = self._reader.poll 362 363 def get(self): 364 with self._rlock: 365 res = self._reader.recv_bytes() 366 # unserialize the data after having released the lock 367 return _ForkingPickler.loads(res) 368 369 def put(self, obj): 370 # serialize the data before acquiring the lock 371 obj = _ForkingPickler.dumps(obj) 372 if self._wlock is None: 373 # writes to a message oriented win32 pipe are atomic 374 self._writer.send_bytes(obj) 375 else: 376 with self._wlock: 377 self._writer.send_bytes(obj) 378 379 __class_getitem__ = classmethod(types.GenericAlias) 380