1*cda5da8dSAndroid Build Coastguard Worker# 2*cda5da8dSAndroid Build Coastguard Worker# Module providing the `Pool` class for managing a process pool 3*cda5da8dSAndroid Build Coastguard Worker# 4*cda5da8dSAndroid Build Coastguard Worker# multiprocessing/pool.py 5*cda5da8dSAndroid Build Coastguard Worker# 6*cda5da8dSAndroid Build Coastguard Worker# Copyright (c) 2006-2008, R Oudkerk 7*cda5da8dSAndroid Build Coastguard Worker# Licensed to PSF under a Contributor Agreement. 8*cda5da8dSAndroid Build Coastguard Worker# 9*cda5da8dSAndroid Build Coastguard Worker 10*cda5da8dSAndroid Build Coastguard Worker__all__ = ['Pool', 'ThreadPool'] 11*cda5da8dSAndroid Build Coastguard Worker 12*cda5da8dSAndroid Build Coastguard Worker# 13*cda5da8dSAndroid Build Coastguard Worker# Imports 14*cda5da8dSAndroid Build Coastguard Worker# 15*cda5da8dSAndroid Build Coastguard Worker 16*cda5da8dSAndroid Build Coastguard Workerimport collections 17*cda5da8dSAndroid Build Coastguard Workerimport itertools 18*cda5da8dSAndroid Build Coastguard Workerimport os 19*cda5da8dSAndroid Build Coastguard Workerimport queue 20*cda5da8dSAndroid Build Coastguard Workerimport threading 21*cda5da8dSAndroid Build Coastguard Workerimport time 22*cda5da8dSAndroid Build Coastguard Workerimport traceback 23*cda5da8dSAndroid Build Coastguard Workerimport types 24*cda5da8dSAndroid Build Coastguard Workerimport warnings 25*cda5da8dSAndroid Build Coastguard Worker 26*cda5da8dSAndroid Build Coastguard Worker# If threading is available then ThreadPool should be provided. Therefore 27*cda5da8dSAndroid Build Coastguard Worker# we avoid top-level imports which are liable to fail on some systems. 28*cda5da8dSAndroid Build Coastguard Workerfrom . import util 29*cda5da8dSAndroid Build Coastguard Workerfrom . import get_context, TimeoutError 30*cda5da8dSAndroid Build Coastguard Workerfrom .connection import wait 31*cda5da8dSAndroid Build Coastguard Worker 32*cda5da8dSAndroid Build Coastguard Worker# 33*cda5da8dSAndroid Build Coastguard Worker# Constants representing the state of a pool 34*cda5da8dSAndroid Build Coastguard Worker# 35*cda5da8dSAndroid Build Coastguard Worker 36*cda5da8dSAndroid Build Coastguard WorkerINIT = "INIT" 37*cda5da8dSAndroid Build Coastguard WorkerRUN = "RUN" 38*cda5da8dSAndroid Build Coastguard WorkerCLOSE = "CLOSE" 39*cda5da8dSAndroid Build Coastguard WorkerTERMINATE = "TERMINATE" 40*cda5da8dSAndroid Build Coastguard Worker 41*cda5da8dSAndroid Build Coastguard Worker# 42*cda5da8dSAndroid Build Coastguard Worker# Miscellaneous 43*cda5da8dSAndroid Build Coastguard Worker# 44*cda5da8dSAndroid Build Coastguard Worker 45*cda5da8dSAndroid Build Coastguard Workerjob_counter = itertools.count() 46*cda5da8dSAndroid Build Coastguard Worker 47*cda5da8dSAndroid Build Coastguard Workerdef mapstar(args): 48*cda5da8dSAndroid Build Coastguard Worker return list(map(*args)) 49*cda5da8dSAndroid Build Coastguard Worker 50*cda5da8dSAndroid Build Coastguard Workerdef starmapstar(args): 51*cda5da8dSAndroid Build Coastguard Worker return list(itertools.starmap(args[0], args[1])) 52*cda5da8dSAndroid Build Coastguard Worker 53*cda5da8dSAndroid Build Coastguard Worker# 54*cda5da8dSAndroid Build Coastguard Worker# Hack to embed stringification of remote traceback in local traceback 55*cda5da8dSAndroid Build Coastguard Worker# 56*cda5da8dSAndroid Build Coastguard Worker 57*cda5da8dSAndroid Build Coastguard Workerclass RemoteTraceback(Exception): 58*cda5da8dSAndroid Build Coastguard Worker def __init__(self, tb): 59*cda5da8dSAndroid Build Coastguard Worker self.tb = tb 60*cda5da8dSAndroid Build Coastguard Worker def __str__(self): 61*cda5da8dSAndroid Build Coastguard Worker return self.tb 62*cda5da8dSAndroid Build Coastguard Worker 63*cda5da8dSAndroid Build Coastguard Workerclass ExceptionWithTraceback: 64*cda5da8dSAndroid Build Coastguard Worker def __init__(self, exc, tb): 65*cda5da8dSAndroid Build Coastguard Worker tb = traceback.format_exception(type(exc), exc, tb) 66*cda5da8dSAndroid Build Coastguard Worker tb = ''.join(tb) 67*cda5da8dSAndroid Build Coastguard Worker self.exc = exc 68*cda5da8dSAndroid Build Coastguard Worker self.tb = '\n"""\n%s"""' % tb 69*cda5da8dSAndroid Build Coastguard Worker def __reduce__(self): 70*cda5da8dSAndroid Build Coastguard Worker return rebuild_exc, (self.exc, self.tb) 71*cda5da8dSAndroid Build Coastguard Worker 72*cda5da8dSAndroid Build Coastguard Workerdef rebuild_exc(exc, tb): 73*cda5da8dSAndroid Build Coastguard Worker exc.__cause__ = RemoteTraceback(tb) 74*cda5da8dSAndroid Build Coastguard Worker return exc 75*cda5da8dSAndroid Build Coastguard Worker 76*cda5da8dSAndroid Build Coastguard Worker# 77*cda5da8dSAndroid Build Coastguard Worker# Code run by worker processes 78*cda5da8dSAndroid Build Coastguard Worker# 79*cda5da8dSAndroid Build Coastguard Worker 80*cda5da8dSAndroid Build Coastguard Workerclass MaybeEncodingError(Exception): 81*cda5da8dSAndroid Build Coastguard Worker """Wraps possible unpickleable errors, so they can be 82*cda5da8dSAndroid Build Coastguard Worker safely sent through the socket.""" 83*cda5da8dSAndroid Build Coastguard Worker 84*cda5da8dSAndroid Build Coastguard Worker def __init__(self, exc, value): 85*cda5da8dSAndroid Build Coastguard Worker self.exc = repr(exc) 86*cda5da8dSAndroid Build Coastguard Worker self.value = repr(value) 87*cda5da8dSAndroid Build Coastguard Worker super(MaybeEncodingError, self).__init__(self.exc, self.value) 88*cda5da8dSAndroid Build Coastguard Worker 89*cda5da8dSAndroid Build Coastguard Worker def __str__(self): 90*cda5da8dSAndroid Build Coastguard Worker return "Error sending result: '%s'. Reason: '%s'" % (self.value, 91*cda5da8dSAndroid Build Coastguard Worker self.exc) 92*cda5da8dSAndroid Build Coastguard Worker 93*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 94*cda5da8dSAndroid Build Coastguard Worker return "<%s: %s>" % (self.__class__.__name__, self) 95*cda5da8dSAndroid Build Coastguard Worker 96*cda5da8dSAndroid Build Coastguard Worker 97*cda5da8dSAndroid Build Coastguard Workerdef worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, 98*cda5da8dSAndroid Build Coastguard Worker wrap_exception=False): 99*cda5da8dSAndroid Build Coastguard Worker if (maxtasks is not None) and not (isinstance(maxtasks, int) 100*cda5da8dSAndroid Build Coastguard Worker and maxtasks >= 1): 101*cda5da8dSAndroid Build Coastguard Worker raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks)) 102*cda5da8dSAndroid Build Coastguard Worker put = outqueue.put 103*cda5da8dSAndroid Build Coastguard Worker get = inqueue.get 104*cda5da8dSAndroid Build Coastguard Worker if hasattr(inqueue, '_writer'): 105*cda5da8dSAndroid Build Coastguard Worker inqueue._writer.close() 106*cda5da8dSAndroid Build Coastguard Worker outqueue._reader.close() 107*cda5da8dSAndroid Build Coastguard Worker 108*cda5da8dSAndroid Build Coastguard Worker if initializer is not None: 109*cda5da8dSAndroid Build Coastguard Worker initializer(*initargs) 110*cda5da8dSAndroid Build Coastguard Worker 111*cda5da8dSAndroid Build Coastguard Worker completed = 0 112*cda5da8dSAndroid Build Coastguard Worker while maxtasks is None or (maxtasks and completed < maxtasks): 113*cda5da8dSAndroid Build Coastguard Worker try: 114*cda5da8dSAndroid Build Coastguard Worker task = get() 115*cda5da8dSAndroid Build Coastguard Worker except (EOFError, OSError): 116*cda5da8dSAndroid Build Coastguard Worker util.debug('worker got EOFError or OSError -- exiting') 117*cda5da8dSAndroid Build Coastguard Worker break 118*cda5da8dSAndroid Build Coastguard Worker 119*cda5da8dSAndroid Build Coastguard Worker if task is None: 120*cda5da8dSAndroid Build Coastguard Worker util.debug('worker got sentinel -- exiting') 121*cda5da8dSAndroid Build Coastguard Worker break 122*cda5da8dSAndroid Build Coastguard Worker 123*cda5da8dSAndroid Build Coastguard Worker job, i, func, args, kwds = task 124*cda5da8dSAndroid Build Coastguard Worker try: 125*cda5da8dSAndroid Build Coastguard Worker result = (True, func(*args, **kwds)) 126*cda5da8dSAndroid Build Coastguard Worker except Exception as e: 127*cda5da8dSAndroid Build Coastguard Worker if wrap_exception and func is not _helper_reraises_exception: 128*cda5da8dSAndroid Build Coastguard Worker e = ExceptionWithTraceback(e, e.__traceback__) 129*cda5da8dSAndroid Build Coastguard Worker result = (False, e) 130*cda5da8dSAndroid Build Coastguard Worker try: 131*cda5da8dSAndroid Build Coastguard Worker put((job, i, result)) 132*cda5da8dSAndroid Build Coastguard Worker except Exception as e: 133*cda5da8dSAndroid Build Coastguard Worker wrapped = MaybeEncodingError(e, result[1]) 134*cda5da8dSAndroid Build Coastguard Worker util.debug("Possible encoding error while sending result: %s" % ( 135*cda5da8dSAndroid Build Coastguard Worker wrapped)) 136*cda5da8dSAndroid Build Coastguard Worker put((job, i, (False, wrapped))) 137*cda5da8dSAndroid Build Coastguard Worker 138*cda5da8dSAndroid Build Coastguard Worker task = job = result = func = args = kwds = None 139*cda5da8dSAndroid Build Coastguard Worker completed += 1 140*cda5da8dSAndroid Build Coastguard Worker util.debug('worker exiting after %d tasks' % completed) 141*cda5da8dSAndroid Build Coastguard Worker 142*cda5da8dSAndroid Build Coastguard Workerdef _helper_reraises_exception(ex): 143*cda5da8dSAndroid Build Coastguard Worker 'Pickle-able helper function for use by _guarded_task_generation.' 144*cda5da8dSAndroid Build Coastguard Worker raise ex 145*cda5da8dSAndroid Build Coastguard Worker 146*cda5da8dSAndroid Build Coastguard Worker# 147*cda5da8dSAndroid Build Coastguard Worker# Class representing a process pool 148*cda5da8dSAndroid Build Coastguard Worker# 149*cda5da8dSAndroid Build Coastguard Worker 150*cda5da8dSAndroid Build Coastguard Workerclass _PoolCache(dict): 151*cda5da8dSAndroid Build Coastguard Worker """ 152*cda5da8dSAndroid Build Coastguard Worker Class that implements a cache for the Pool class that will notify 153*cda5da8dSAndroid Build Coastguard Worker the pool management threads every time the cache is emptied. The 154*cda5da8dSAndroid Build Coastguard Worker notification is done by the use of a queue that is provided when 155*cda5da8dSAndroid Build Coastguard Worker instantiating the cache. 156*cda5da8dSAndroid Build Coastguard Worker """ 157*cda5da8dSAndroid Build Coastguard Worker def __init__(self, /, *args, notifier=None, **kwds): 158*cda5da8dSAndroid Build Coastguard Worker self.notifier = notifier 159*cda5da8dSAndroid Build Coastguard Worker super().__init__(*args, **kwds) 160*cda5da8dSAndroid Build Coastguard Worker 161*cda5da8dSAndroid Build Coastguard Worker def __delitem__(self, item): 162*cda5da8dSAndroid Build Coastguard Worker super().__delitem__(item) 163*cda5da8dSAndroid Build Coastguard Worker 164*cda5da8dSAndroid Build Coastguard Worker # Notify that the cache is empty. This is important because the 165*cda5da8dSAndroid Build Coastguard Worker # pool keeps maintaining workers until the cache gets drained. This 166*cda5da8dSAndroid Build Coastguard Worker # eliminates a race condition in which a task is finished after the 167*cda5da8dSAndroid Build Coastguard Worker # the pool's _handle_workers method has enter another iteration of the 168*cda5da8dSAndroid Build Coastguard Worker # loop. In this situation, the only event that can wake up the pool 169*cda5da8dSAndroid Build Coastguard Worker # is the cache to be emptied (no more tasks available). 170*cda5da8dSAndroid Build Coastguard Worker if not self: 171*cda5da8dSAndroid Build Coastguard Worker self.notifier.put(None) 172*cda5da8dSAndroid Build Coastguard Worker 173*cda5da8dSAndroid Build Coastguard Workerclass Pool(object): 174*cda5da8dSAndroid Build Coastguard Worker ''' 175*cda5da8dSAndroid Build Coastguard Worker Class which supports an async version of applying functions to arguments. 176*cda5da8dSAndroid Build Coastguard Worker ''' 177*cda5da8dSAndroid Build Coastguard Worker _wrap_exception = True 178*cda5da8dSAndroid Build Coastguard Worker 179*cda5da8dSAndroid Build Coastguard Worker @staticmethod 180*cda5da8dSAndroid Build Coastguard Worker def Process(ctx, *args, **kwds): 181*cda5da8dSAndroid Build Coastguard Worker return ctx.Process(*args, **kwds) 182*cda5da8dSAndroid Build Coastguard Worker 183*cda5da8dSAndroid Build Coastguard Worker def __init__(self, processes=None, initializer=None, initargs=(), 184*cda5da8dSAndroid Build Coastguard Worker maxtasksperchild=None, context=None): 185*cda5da8dSAndroid Build Coastguard Worker # Attributes initialized early to make sure that they exist in 186*cda5da8dSAndroid Build Coastguard Worker # __del__() if __init__() raises an exception 187*cda5da8dSAndroid Build Coastguard Worker self._pool = [] 188*cda5da8dSAndroid Build Coastguard Worker self._state = INIT 189*cda5da8dSAndroid Build Coastguard Worker 190*cda5da8dSAndroid Build Coastguard Worker self._ctx = context or get_context() 191*cda5da8dSAndroid Build Coastguard Worker self._setup_queues() 192*cda5da8dSAndroid Build Coastguard Worker self._taskqueue = queue.SimpleQueue() 193*cda5da8dSAndroid Build Coastguard Worker # The _change_notifier queue exist to wake up self._handle_workers() 194*cda5da8dSAndroid Build Coastguard Worker # when the cache (self._cache) is empty or when there is a change in 195*cda5da8dSAndroid Build Coastguard Worker # the _state variable of the thread that runs _handle_workers. 196*cda5da8dSAndroid Build Coastguard Worker self._change_notifier = self._ctx.SimpleQueue() 197*cda5da8dSAndroid Build Coastguard Worker self._cache = _PoolCache(notifier=self._change_notifier) 198*cda5da8dSAndroid Build Coastguard Worker self._maxtasksperchild = maxtasksperchild 199*cda5da8dSAndroid Build Coastguard Worker self._initializer = initializer 200*cda5da8dSAndroid Build Coastguard Worker self._initargs = initargs 201*cda5da8dSAndroid Build Coastguard Worker 202*cda5da8dSAndroid Build Coastguard Worker if processes is None: 203*cda5da8dSAndroid Build Coastguard Worker processes = os.cpu_count() or 1 204*cda5da8dSAndroid Build Coastguard Worker if processes < 1: 205*cda5da8dSAndroid Build Coastguard Worker raise ValueError("Number of processes must be at least 1") 206*cda5da8dSAndroid Build Coastguard Worker if maxtasksperchild is not None: 207*cda5da8dSAndroid Build Coastguard Worker if not isinstance(maxtasksperchild, int) or maxtasksperchild <= 0: 208*cda5da8dSAndroid Build Coastguard Worker raise ValueError("maxtasksperchild must be a positive int or None") 209*cda5da8dSAndroid Build Coastguard Worker 210*cda5da8dSAndroid Build Coastguard Worker if initializer is not None and not callable(initializer): 211*cda5da8dSAndroid Build Coastguard Worker raise TypeError('initializer must be a callable') 212*cda5da8dSAndroid Build Coastguard Worker 213*cda5da8dSAndroid Build Coastguard Worker self._processes = processes 214*cda5da8dSAndroid Build Coastguard Worker try: 215*cda5da8dSAndroid Build Coastguard Worker self._repopulate_pool() 216*cda5da8dSAndroid Build Coastguard Worker except Exception: 217*cda5da8dSAndroid Build Coastguard Worker for p in self._pool: 218*cda5da8dSAndroid Build Coastguard Worker if p.exitcode is None: 219*cda5da8dSAndroid Build Coastguard Worker p.terminate() 220*cda5da8dSAndroid Build Coastguard Worker for p in self._pool: 221*cda5da8dSAndroid Build Coastguard Worker p.join() 222*cda5da8dSAndroid Build Coastguard Worker raise 223*cda5da8dSAndroid Build Coastguard Worker 224*cda5da8dSAndroid Build Coastguard Worker sentinels = self._get_sentinels() 225*cda5da8dSAndroid Build Coastguard Worker 226*cda5da8dSAndroid Build Coastguard Worker self._worker_handler = threading.Thread( 227*cda5da8dSAndroid Build Coastguard Worker target=Pool._handle_workers, 228*cda5da8dSAndroid Build Coastguard Worker args=(self._cache, self._taskqueue, self._ctx, self.Process, 229*cda5da8dSAndroid Build Coastguard Worker self._processes, self._pool, self._inqueue, self._outqueue, 230*cda5da8dSAndroid Build Coastguard Worker self._initializer, self._initargs, self._maxtasksperchild, 231*cda5da8dSAndroid Build Coastguard Worker self._wrap_exception, sentinels, self._change_notifier) 232*cda5da8dSAndroid Build Coastguard Worker ) 233*cda5da8dSAndroid Build Coastguard Worker self._worker_handler.daemon = True 234*cda5da8dSAndroid Build Coastguard Worker self._worker_handler._state = RUN 235*cda5da8dSAndroid Build Coastguard Worker self._worker_handler.start() 236*cda5da8dSAndroid Build Coastguard Worker 237*cda5da8dSAndroid Build Coastguard Worker 238*cda5da8dSAndroid Build Coastguard Worker self._task_handler = threading.Thread( 239*cda5da8dSAndroid Build Coastguard Worker target=Pool._handle_tasks, 240*cda5da8dSAndroid Build Coastguard Worker args=(self._taskqueue, self._quick_put, self._outqueue, 241*cda5da8dSAndroid Build Coastguard Worker self._pool, self._cache) 242*cda5da8dSAndroid Build Coastguard Worker ) 243*cda5da8dSAndroid Build Coastguard Worker self._task_handler.daemon = True 244*cda5da8dSAndroid Build Coastguard Worker self._task_handler._state = RUN 245*cda5da8dSAndroid Build Coastguard Worker self._task_handler.start() 246*cda5da8dSAndroid Build Coastguard Worker 247*cda5da8dSAndroid Build Coastguard Worker self._result_handler = threading.Thread( 248*cda5da8dSAndroid Build Coastguard Worker target=Pool._handle_results, 249*cda5da8dSAndroid Build Coastguard Worker args=(self._outqueue, self._quick_get, self._cache) 250*cda5da8dSAndroid Build Coastguard Worker ) 251*cda5da8dSAndroid Build Coastguard Worker self._result_handler.daemon = True 252*cda5da8dSAndroid Build Coastguard Worker self._result_handler._state = RUN 253*cda5da8dSAndroid Build Coastguard Worker self._result_handler.start() 254*cda5da8dSAndroid Build Coastguard Worker 255*cda5da8dSAndroid Build Coastguard Worker self._terminate = util.Finalize( 256*cda5da8dSAndroid Build Coastguard Worker self, self._terminate_pool, 257*cda5da8dSAndroid Build Coastguard Worker args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 258*cda5da8dSAndroid Build Coastguard Worker self._change_notifier, self._worker_handler, self._task_handler, 259*cda5da8dSAndroid Build Coastguard Worker self._result_handler, self._cache), 260*cda5da8dSAndroid Build Coastguard Worker exitpriority=15 261*cda5da8dSAndroid Build Coastguard Worker ) 262*cda5da8dSAndroid Build Coastguard Worker self._state = RUN 263*cda5da8dSAndroid Build Coastguard Worker 264*cda5da8dSAndroid Build Coastguard Worker # Copy globals as function locals to make sure that they are available 265*cda5da8dSAndroid Build Coastguard Worker # during Python shutdown when the Pool is destroyed. 266*cda5da8dSAndroid Build Coastguard Worker def __del__(self, _warn=warnings.warn, RUN=RUN): 267*cda5da8dSAndroid Build Coastguard Worker if self._state == RUN: 268*cda5da8dSAndroid Build Coastguard Worker _warn(f"unclosed running multiprocessing pool {self!r}", 269*cda5da8dSAndroid Build Coastguard Worker ResourceWarning, source=self) 270*cda5da8dSAndroid Build Coastguard Worker if getattr(self, '_change_notifier', None) is not None: 271*cda5da8dSAndroid Build Coastguard Worker self._change_notifier.put(None) 272*cda5da8dSAndroid Build Coastguard Worker 273*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 274*cda5da8dSAndroid Build Coastguard Worker cls = self.__class__ 275*cda5da8dSAndroid Build Coastguard Worker return (f'<{cls.__module__}.{cls.__qualname__} ' 276*cda5da8dSAndroid Build Coastguard Worker f'state={self._state} ' 277*cda5da8dSAndroid Build Coastguard Worker f'pool_size={len(self._pool)}>') 278*cda5da8dSAndroid Build Coastguard Worker 279*cda5da8dSAndroid Build Coastguard Worker def _get_sentinels(self): 280*cda5da8dSAndroid Build Coastguard Worker task_queue_sentinels = [self._outqueue._reader] 281*cda5da8dSAndroid Build Coastguard Worker self_notifier_sentinels = [self._change_notifier._reader] 282*cda5da8dSAndroid Build Coastguard Worker return [*task_queue_sentinels, *self_notifier_sentinels] 283*cda5da8dSAndroid Build Coastguard Worker 284*cda5da8dSAndroid Build Coastguard Worker @staticmethod 285*cda5da8dSAndroid Build Coastguard Worker def _get_worker_sentinels(workers): 286*cda5da8dSAndroid Build Coastguard Worker return [worker.sentinel for worker in 287*cda5da8dSAndroid Build Coastguard Worker workers if hasattr(worker, "sentinel")] 288*cda5da8dSAndroid Build Coastguard Worker 289*cda5da8dSAndroid Build Coastguard Worker @staticmethod 290*cda5da8dSAndroid Build Coastguard Worker def _join_exited_workers(pool): 291*cda5da8dSAndroid Build Coastguard Worker """Cleanup after any worker processes which have exited due to reaching 292*cda5da8dSAndroid Build Coastguard Worker their specified lifetime. Returns True if any workers were cleaned up. 293*cda5da8dSAndroid Build Coastguard Worker """ 294*cda5da8dSAndroid Build Coastguard Worker cleaned = False 295*cda5da8dSAndroid Build Coastguard Worker for i in reversed(range(len(pool))): 296*cda5da8dSAndroid Build Coastguard Worker worker = pool[i] 297*cda5da8dSAndroid Build Coastguard Worker if worker.exitcode is not None: 298*cda5da8dSAndroid Build Coastguard Worker # worker exited 299*cda5da8dSAndroid Build Coastguard Worker util.debug('cleaning up worker %d' % i) 300*cda5da8dSAndroid Build Coastguard Worker worker.join() 301*cda5da8dSAndroid Build Coastguard Worker cleaned = True 302*cda5da8dSAndroid Build Coastguard Worker del pool[i] 303*cda5da8dSAndroid Build Coastguard Worker return cleaned 304*cda5da8dSAndroid Build Coastguard Worker 305*cda5da8dSAndroid Build Coastguard Worker def _repopulate_pool(self): 306*cda5da8dSAndroid Build Coastguard Worker return self._repopulate_pool_static(self._ctx, self.Process, 307*cda5da8dSAndroid Build Coastguard Worker self._processes, 308*cda5da8dSAndroid Build Coastguard Worker self._pool, self._inqueue, 309*cda5da8dSAndroid Build Coastguard Worker self._outqueue, self._initializer, 310*cda5da8dSAndroid Build Coastguard Worker self._initargs, 311*cda5da8dSAndroid Build Coastguard Worker self._maxtasksperchild, 312*cda5da8dSAndroid Build Coastguard Worker self._wrap_exception) 313*cda5da8dSAndroid Build Coastguard Worker 314*cda5da8dSAndroid Build Coastguard Worker @staticmethod 315*cda5da8dSAndroid Build Coastguard Worker def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, 316*cda5da8dSAndroid Build Coastguard Worker outqueue, initializer, initargs, 317*cda5da8dSAndroid Build Coastguard Worker maxtasksperchild, wrap_exception): 318*cda5da8dSAndroid Build Coastguard Worker """Bring the number of pool processes up to the specified number, 319*cda5da8dSAndroid Build Coastguard Worker for use after reaping workers which have exited. 320*cda5da8dSAndroid Build Coastguard Worker """ 321*cda5da8dSAndroid Build Coastguard Worker for i in range(processes - len(pool)): 322*cda5da8dSAndroid Build Coastguard Worker w = Process(ctx, target=worker, 323*cda5da8dSAndroid Build Coastguard Worker args=(inqueue, outqueue, 324*cda5da8dSAndroid Build Coastguard Worker initializer, 325*cda5da8dSAndroid Build Coastguard Worker initargs, maxtasksperchild, 326*cda5da8dSAndroid Build Coastguard Worker wrap_exception)) 327*cda5da8dSAndroid Build Coastguard Worker w.name = w.name.replace('Process', 'PoolWorker') 328*cda5da8dSAndroid Build Coastguard Worker w.daemon = True 329*cda5da8dSAndroid Build Coastguard Worker w.start() 330*cda5da8dSAndroid Build Coastguard Worker pool.append(w) 331*cda5da8dSAndroid Build Coastguard Worker util.debug('added worker') 332*cda5da8dSAndroid Build Coastguard Worker 333*cda5da8dSAndroid Build Coastguard Worker @staticmethod 334*cda5da8dSAndroid Build Coastguard Worker def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, 335*cda5da8dSAndroid Build Coastguard Worker initializer, initargs, maxtasksperchild, 336*cda5da8dSAndroid Build Coastguard Worker wrap_exception): 337*cda5da8dSAndroid Build Coastguard Worker """Clean up any exited workers and start replacements for them. 338*cda5da8dSAndroid Build Coastguard Worker """ 339*cda5da8dSAndroid Build Coastguard Worker if Pool._join_exited_workers(pool): 340*cda5da8dSAndroid Build Coastguard Worker Pool._repopulate_pool_static(ctx, Process, processes, pool, 341*cda5da8dSAndroid Build Coastguard Worker inqueue, outqueue, initializer, 342*cda5da8dSAndroid Build Coastguard Worker initargs, maxtasksperchild, 343*cda5da8dSAndroid Build Coastguard Worker wrap_exception) 344*cda5da8dSAndroid Build Coastguard Worker 345*cda5da8dSAndroid Build Coastguard Worker def _setup_queues(self): 346*cda5da8dSAndroid Build Coastguard Worker self._inqueue = self._ctx.SimpleQueue() 347*cda5da8dSAndroid Build Coastguard Worker self._outqueue = self._ctx.SimpleQueue() 348*cda5da8dSAndroid Build Coastguard Worker self._quick_put = self._inqueue._writer.send 349*cda5da8dSAndroid Build Coastguard Worker self._quick_get = self._outqueue._reader.recv 350*cda5da8dSAndroid Build Coastguard Worker 351*cda5da8dSAndroid Build Coastguard Worker def _check_running(self): 352*cda5da8dSAndroid Build Coastguard Worker if self._state != RUN: 353*cda5da8dSAndroid Build Coastguard Worker raise ValueError("Pool not running") 354*cda5da8dSAndroid Build Coastguard Worker 355*cda5da8dSAndroid Build Coastguard Worker def apply(self, func, args=(), kwds={}): 356*cda5da8dSAndroid Build Coastguard Worker ''' 357*cda5da8dSAndroid Build Coastguard Worker Equivalent of `func(*args, **kwds)`. 358*cda5da8dSAndroid Build Coastguard Worker Pool must be running. 359*cda5da8dSAndroid Build Coastguard Worker ''' 360*cda5da8dSAndroid Build Coastguard Worker return self.apply_async(func, args, kwds).get() 361*cda5da8dSAndroid Build Coastguard Worker 362*cda5da8dSAndroid Build Coastguard Worker def map(self, func, iterable, chunksize=None): 363*cda5da8dSAndroid Build Coastguard Worker ''' 364*cda5da8dSAndroid Build Coastguard Worker Apply `func` to each element in `iterable`, collecting the results 365*cda5da8dSAndroid Build Coastguard Worker in a list that is returned. 366*cda5da8dSAndroid Build Coastguard Worker ''' 367*cda5da8dSAndroid Build Coastguard Worker return self._map_async(func, iterable, mapstar, chunksize).get() 368*cda5da8dSAndroid Build Coastguard Worker 369*cda5da8dSAndroid Build Coastguard Worker def starmap(self, func, iterable, chunksize=None): 370*cda5da8dSAndroid Build Coastguard Worker ''' 371*cda5da8dSAndroid Build Coastguard Worker Like `map()` method but the elements of the `iterable` are expected to 372*cda5da8dSAndroid Build Coastguard Worker be iterables as well and will be unpacked as arguments. Hence 373*cda5da8dSAndroid Build Coastguard Worker `func` and (a, b) becomes func(a, b). 374*cda5da8dSAndroid Build Coastguard Worker ''' 375*cda5da8dSAndroid Build Coastguard Worker return self._map_async(func, iterable, starmapstar, chunksize).get() 376*cda5da8dSAndroid Build Coastguard Worker 377*cda5da8dSAndroid Build Coastguard Worker def starmap_async(self, func, iterable, chunksize=None, callback=None, 378*cda5da8dSAndroid Build Coastguard Worker error_callback=None): 379*cda5da8dSAndroid Build Coastguard Worker ''' 380*cda5da8dSAndroid Build Coastguard Worker Asynchronous version of `starmap()` method. 381*cda5da8dSAndroid Build Coastguard Worker ''' 382*cda5da8dSAndroid Build Coastguard Worker return self._map_async(func, iterable, starmapstar, chunksize, 383*cda5da8dSAndroid Build Coastguard Worker callback, error_callback) 384*cda5da8dSAndroid Build Coastguard Worker 385*cda5da8dSAndroid Build Coastguard Worker def _guarded_task_generation(self, result_job, func, iterable): 386*cda5da8dSAndroid Build Coastguard Worker '''Provides a generator of tasks for imap and imap_unordered with 387*cda5da8dSAndroid Build Coastguard Worker appropriate handling for iterables which throw exceptions during 388*cda5da8dSAndroid Build Coastguard Worker iteration.''' 389*cda5da8dSAndroid Build Coastguard Worker try: 390*cda5da8dSAndroid Build Coastguard Worker i = -1 391*cda5da8dSAndroid Build Coastguard Worker for i, x in enumerate(iterable): 392*cda5da8dSAndroid Build Coastguard Worker yield (result_job, i, func, (x,), {}) 393*cda5da8dSAndroid Build Coastguard Worker except Exception as e: 394*cda5da8dSAndroid Build Coastguard Worker yield (result_job, i+1, _helper_reraises_exception, (e,), {}) 395*cda5da8dSAndroid Build Coastguard Worker 396*cda5da8dSAndroid Build Coastguard Worker def imap(self, func, iterable, chunksize=1): 397*cda5da8dSAndroid Build Coastguard Worker ''' 398*cda5da8dSAndroid Build Coastguard Worker Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. 399*cda5da8dSAndroid Build Coastguard Worker ''' 400*cda5da8dSAndroid Build Coastguard Worker self._check_running() 401*cda5da8dSAndroid Build Coastguard Worker if chunksize == 1: 402*cda5da8dSAndroid Build Coastguard Worker result = IMapIterator(self) 403*cda5da8dSAndroid Build Coastguard Worker self._taskqueue.put( 404*cda5da8dSAndroid Build Coastguard Worker ( 405*cda5da8dSAndroid Build Coastguard Worker self._guarded_task_generation(result._job, func, iterable), 406*cda5da8dSAndroid Build Coastguard Worker result._set_length 407*cda5da8dSAndroid Build Coastguard Worker )) 408*cda5da8dSAndroid Build Coastguard Worker return result 409*cda5da8dSAndroid Build Coastguard Worker else: 410*cda5da8dSAndroid Build Coastguard Worker if chunksize < 1: 411*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 412*cda5da8dSAndroid Build Coastguard Worker "Chunksize must be 1+, not {0:n}".format( 413*cda5da8dSAndroid Build Coastguard Worker chunksize)) 414*cda5da8dSAndroid Build Coastguard Worker task_batches = Pool._get_tasks(func, iterable, chunksize) 415*cda5da8dSAndroid Build Coastguard Worker result = IMapIterator(self) 416*cda5da8dSAndroid Build Coastguard Worker self._taskqueue.put( 417*cda5da8dSAndroid Build Coastguard Worker ( 418*cda5da8dSAndroid Build Coastguard Worker self._guarded_task_generation(result._job, 419*cda5da8dSAndroid Build Coastguard Worker mapstar, 420*cda5da8dSAndroid Build Coastguard Worker task_batches), 421*cda5da8dSAndroid Build Coastguard Worker result._set_length 422*cda5da8dSAndroid Build Coastguard Worker )) 423*cda5da8dSAndroid Build Coastguard Worker return (item for chunk in result for item in chunk) 424*cda5da8dSAndroid Build Coastguard Worker 425*cda5da8dSAndroid Build Coastguard Worker def imap_unordered(self, func, iterable, chunksize=1): 426*cda5da8dSAndroid Build Coastguard Worker ''' 427*cda5da8dSAndroid Build Coastguard Worker Like `imap()` method but ordering of results is arbitrary. 428*cda5da8dSAndroid Build Coastguard Worker ''' 429*cda5da8dSAndroid Build Coastguard Worker self._check_running() 430*cda5da8dSAndroid Build Coastguard Worker if chunksize == 1: 431*cda5da8dSAndroid Build Coastguard Worker result = IMapUnorderedIterator(self) 432*cda5da8dSAndroid Build Coastguard Worker self._taskqueue.put( 433*cda5da8dSAndroid Build Coastguard Worker ( 434*cda5da8dSAndroid Build Coastguard Worker self._guarded_task_generation(result._job, func, iterable), 435*cda5da8dSAndroid Build Coastguard Worker result._set_length 436*cda5da8dSAndroid Build Coastguard Worker )) 437*cda5da8dSAndroid Build Coastguard Worker return result 438*cda5da8dSAndroid Build Coastguard Worker else: 439*cda5da8dSAndroid Build Coastguard Worker if chunksize < 1: 440*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 441*cda5da8dSAndroid Build Coastguard Worker "Chunksize must be 1+, not {0!r}".format(chunksize)) 442*cda5da8dSAndroid Build Coastguard Worker task_batches = Pool._get_tasks(func, iterable, chunksize) 443*cda5da8dSAndroid Build Coastguard Worker result = IMapUnorderedIterator(self) 444*cda5da8dSAndroid Build Coastguard Worker self._taskqueue.put( 445*cda5da8dSAndroid Build Coastguard Worker ( 446*cda5da8dSAndroid Build Coastguard Worker self._guarded_task_generation(result._job, 447*cda5da8dSAndroid Build Coastguard Worker mapstar, 448*cda5da8dSAndroid Build Coastguard Worker task_batches), 449*cda5da8dSAndroid Build Coastguard Worker result._set_length 450*cda5da8dSAndroid Build Coastguard Worker )) 451*cda5da8dSAndroid Build Coastguard Worker return (item for chunk in result for item in chunk) 452*cda5da8dSAndroid Build Coastguard Worker 453*cda5da8dSAndroid Build Coastguard Worker def apply_async(self, func, args=(), kwds={}, callback=None, 454*cda5da8dSAndroid Build Coastguard Worker error_callback=None): 455*cda5da8dSAndroid Build Coastguard Worker ''' 456*cda5da8dSAndroid Build Coastguard Worker Asynchronous version of `apply()` method. 457*cda5da8dSAndroid Build Coastguard Worker ''' 458*cda5da8dSAndroid Build Coastguard Worker self._check_running() 459*cda5da8dSAndroid Build Coastguard Worker result = ApplyResult(self, callback, error_callback) 460*cda5da8dSAndroid Build Coastguard Worker self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) 461*cda5da8dSAndroid Build Coastguard Worker return result 462*cda5da8dSAndroid Build Coastguard Worker 463*cda5da8dSAndroid Build Coastguard Worker def map_async(self, func, iterable, chunksize=None, callback=None, 464*cda5da8dSAndroid Build Coastguard Worker error_callback=None): 465*cda5da8dSAndroid Build Coastguard Worker ''' 466*cda5da8dSAndroid Build Coastguard Worker Asynchronous version of `map()` method. 467*cda5da8dSAndroid Build Coastguard Worker ''' 468*cda5da8dSAndroid Build Coastguard Worker return self._map_async(func, iterable, mapstar, chunksize, callback, 469*cda5da8dSAndroid Build Coastguard Worker error_callback) 470*cda5da8dSAndroid Build Coastguard Worker 471*cda5da8dSAndroid Build Coastguard Worker def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, 472*cda5da8dSAndroid Build Coastguard Worker error_callback=None): 473*cda5da8dSAndroid Build Coastguard Worker ''' 474*cda5da8dSAndroid Build Coastguard Worker Helper function to implement map, starmap and their async counterparts. 475*cda5da8dSAndroid Build Coastguard Worker ''' 476*cda5da8dSAndroid Build Coastguard Worker self._check_running() 477*cda5da8dSAndroid Build Coastguard Worker if not hasattr(iterable, '__len__'): 478*cda5da8dSAndroid Build Coastguard Worker iterable = list(iterable) 479*cda5da8dSAndroid Build Coastguard Worker 480*cda5da8dSAndroid Build Coastguard Worker if chunksize is None: 481*cda5da8dSAndroid Build Coastguard Worker chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 482*cda5da8dSAndroid Build Coastguard Worker if extra: 483*cda5da8dSAndroid Build Coastguard Worker chunksize += 1 484*cda5da8dSAndroid Build Coastguard Worker if len(iterable) == 0: 485*cda5da8dSAndroid Build Coastguard Worker chunksize = 0 486*cda5da8dSAndroid Build Coastguard Worker 487*cda5da8dSAndroid Build Coastguard Worker task_batches = Pool._get_tasks(func, iterable, chunksize) 488*cda5da8dSAndroid Build Coastguard Worker result = MapResult(self, chunksize, len(iterable), callback, 489*cda5da8dSAndroid Build Coastguard Worker error_callback=error_callback) 490*cda5da8dSAndroid Build Coastguard Worker self._taskqueue.put( 491*cda5da8dSAndroid Build Coastguard Worker ( 492*cda5da8dSAndroid Build Coastguard Worker self._guarded_task_generation(result._job, 493*cda5da8dSAndroid Build Coastguard Worker mapper, 494*cda5da8dSAndroid Build Coastguard Worker task_batches), 495*cda5da8dSAndroid Build Coastguard Worker None 496*cda5da8dSAndroid Build Coastguard Worker ) 497*cda5da8dSAndroid Build Coastguard Worker ) 498*cda5da8dSAndroid Build Coastguard Worker return result 499*cda5da8dSAndroid Build Coastguard Worker 500*cda5da8dSAndroid Build Coastguard Worker @staticmethod 501*cda5da8dSAndroid Build Coastguard Worker def _wait_for_updates(sentinels, change_notifier, timeout=None): 502*cda5da8dSAndroid Build Coastguard Worker wait(sentinels, timeout=timeout) 503*cda5da8dSAndroid Build Coastguard Worker while not change_notifier.empty(): 504*cda5da8dSAndroid Build Coastguard Worker change_notifier.get() 505*cda5da8dSAndroid Build Coastguard Worker 506*cda5da8dSAndroid Build Coastguard Worker @classmethod 507*cda5da8dSAndroid Build Coastguard Worker def _handle_workers(cls, cache, taskqueue, ctx, Process, processes, 508*cda5da8dSAndroid Build Coastguard Worker pool, inqueue, outqueue, initializer, initargs, 509*cda5da8dSAndroid Build Coastguard Worker maxtasksperchild, wrap_exception, sentinels, 510*cda5da8dSAndroid Build Coastguard Worker change_notifier): 511*cda5da8dSAndroid Build Coastguard Worker thread = threading.current_thread() 512*cda5da8dSAndroid Build Coastguard Worker 513*cda5da8dSAndroid Build Coastguard Worker # Keep maintaining workers until the cache gets drained, unless the pool 514*cda5da8dSAndroid Build Coastguard Worker # is terminated. 515*cda5da8dSAndroid Build Coastguard Worker while thread._state == RUN or (cache and thread._state != TERMINATE): 516*cda5da8dSAndroid Build Coastguard Worker cls._maintain_pool(ctx, Process, processes, pool, inqueue, 517*cda5da8dSAndroid Build Coastguard Worker outqueue, initializer, initargs, 518*cda5da8dSAndroid Build Coastguard Worker maxtasksperchild, wrap_exception) 519*cda5da8dSAndroid Build Coastguard Worker 520*cda5da8dSAndroid Build Coastguard Worker current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels] 521*cda5da8dSAndroid Build Coastguard Worker 522*cda5da8dSAndroid Build Coastguard Worker cls._wait_for_updates(current_sentinels, change_notifier) 523*cda5da8dSAndroid Build Coastguard Worker # send sentinel to stop workers 524*cda5da8dSAndroid Build Coastguard Worker taskqueue.put(None) 525*cda5da8dSAndroid Build Coastguard Worker util.debug('worker handler exiting') 526*cda5da8dSAndroid Build Coastguard Worker 527*cda5da8dSAndroid Build Coastguard Worker @staticmethod 528*cda5da8dSAndroid Build Coastguard Worker def _handle_tasks(taskqueue, put, outqueue, pool, cache): 529*cda5da8dSAndroid Build Coastguard Worker thread = threading.current_thread() 530*cda5da8dSAndroid Build Coastguard Worker 531*cda5da8dSAndroid Build Coastguard Worker for taskseq, set_length in iter(taskqueue.get, None): 532*cda5da8dSAndroid Build Coastguard Worker task = None 533*cda5da8dSAndroid Build Coastguard Worker try: 534*cda5da8dSAndroid Build Coastguard Worker # iterating taskseq cannot fail 535*cda5da8dSAndroid Build Coastguard Worker for task in taskseq: 536*cda5da8dSAndroid Build Coastguard Worker if thread._state != RUN: 537*cda5da8dSAndroid Build Coastguard Worker util.debug('task handler found thread._state != RUN') 538*cda5da8dSAndroid Build Coastguard Worker break 539*cda5da8dSAndroid Build Coastguard Worker try: 540*cda5da8dSAndroid Build Coastguard Worker put(task) 541*cda5da8dSAndroid Build Coastguard Worker except Exception as e: 542*cda5da8dSAndroid Build Coastguard Worker job, idx = task[:2] 543*cda5da8dSAndroid Build Coastguard Worker try: 544*cda5da8dSAndroid Build Coastguard Worker cache[job]._set(idx, (False, e)) 545*cda5da8dSAndroid Build Coastguard Worker except KeyError: 546*cda5da8dSAndroid Build Coastguard Worker pass 547*cda5da8dSAndroid Build Coastguard Worker else: 548*cda5da8dSAndroid Build Coastguard Worker if set_length: 549*cda5da8dSAndroid Build Coastguard Worker util.debug('doing set_length()') 550*cda5da8dSAndroid Build Coastguard Worker idx = task[1] if task else -1 551*cda5da8dSAndroid Build Coastguard Worker set_length(idx + 1) 552*cda5da8dSAndroid Build Coastguard Worker continue 553*cda5da8dSAndroid Build Coastguard Worker break 554*cda5da8dSAndroid Build Coastguard Worker finally: 555*cda5da8dSAndroid Build Coastguard Worker task = taskseq = job = None 556*cda5da8dSAndroid Build Coastguard Worker else: 557*cda5da8dSAndroid Build Coastguard Worker util.debug('task handler got sentinel') 558*cda5da8dSAndroid Build Coastguard Worker 559*cda5da8dSAndroid Build Coastguard Worker try: 560*cda5da8dSAndroid Build Coastguard Worker # tell result handler to finish when cache is empty 561*cda5da8dSAndroid Build Coastguard Worker util.debug('task handler sending sentinel to result handler') 562*cda5da8dSAndroid Build Coastguard Worker outqueue.put(None) 563*cda5da8dSAndroid Build Coastguard Worker 564*cda5da8dSAndroid Build Coastguard Worker # tell workers there is no more work 565*cda5da8dSAndroid Build Coastguard Worker util.debug('task handler sending sentinel to workers') 566*cda5da8dSAndroid Build Coastguard Worker for p in pool: 567*cda5da8dSAndroid Build Coastguard Worker put(None) 568*cda5da8dSAndroid Build Coastguard Worker except OSError: 569*cda5da8dSAndroid Build Coastguard Worker util.debug('task handler got OSError when sending sentinels') 570*cda5da8dSAndroid Build Coastguard Worker 571*cda5da8dSAndroid Build Coastguard Worker util.debug('task handler exiting') 572*cda5da8dSAndroid Build Coastguard Worker 573*cda5da8dSAndroid Build Coastguard Worker @staticmethod 574*cda5da8dSAndroid Build Coastguard Worker def _handle_results(outqueue, get, cache): 575*cda5da8dSAndroid Build Coastguard Worker thread = threading.current_thread() 576*cda5da8dSAndroid Build Coastguard Worker 577*cda5da8dSAndroid Build Coastguard Worker while 1: 578*cda5da8dSAndroid Build Coastguard Worker try: 579*cda5da8dSAndroid Build Coastguard Worker task = get() 580*cda5da8dSAndroid Build Coastguard Worker except (OSError, EOFError): 581*cda5da8dSAndroid Build Coastguard Worker util.debug('result handler got EOFError/OSError -- exiting') 582*cda5da8dSAndroid Build Coastguard Worker return 583*cda5da8dSAndroid Build Coastguard Worker 584*cda5da8dSAndroid Build Coastguard Worker if thread._state != RUN: 585*cda5da8dSAndroid Build Coastguard Worker assert thread._state == TERMINATE, "Thread not in TERMINATE" 586*cda5da8dSAndroid Build Coastguard Worker util.debug('result handler found thread._state=TERMINATE') 587*cda5da8dSAndroid Build Coastguard Worker break 588*cda5da8dSAndroid Build Coastguard Worker 589*cda5da8dSAndroid Build Coastguard Worker if task is None: 590*cda5da8dSAndroid Build Coastguard Worker util.debug('result handler got sentinel') 591*cda5da8dSAndroid Build Coastguard Worker break 592*cda5da8dSAndroid Build Coastguard Worker 593*cda5da8dSAndroid Build Coastguard Worker job, i, obj = task 594*cda5da8dSAndroid Build Coastguard Worker try: 595*cda5da8dSAndroid Build Coastguard Worker cache[job]._set(i, obj) 596*cda5da8dSAndroid Build Coastguard Worker except KeyError: 597*cda5da8dSAndroid Build Coastguard Worker pass 598*cda5da8dSAndroid Build Coastguard Worker task = job = obj = None 599*cda5da8dSAndroid Build Coastguard Worker 600*cda5da8dSAndroid Build Coastguard Worker while cache and thread._state != TERMINATE: 601*cda5da8dSAndroid Build Coastguard Worker try: 602*cda5da8dSAndroid Build Coastguard Worker task = get() 603*cda5da8dSAndroid Build Coastguard Worker except (OSError, EOFError): 604*cda5da8dSAndroid Build Coastguard Worker util.debug('result handler got EOFError/OSError -- exiting') 605*cda5da8dSAndroid Build Coastguard Worker return 606*cda5da8dSAndroid Build Coastguard Worker 607*cda5da8dSAndroid Build Coastguard Worker if task is None: 608*cda5da8dSAndroid Build Coastguard Worker util.debug('result handler ignoring extra sentinel') 609*cda5da8dSAndroid Build Coastguard Worker continue 610*cda5da8dSAndroid Build Coastguard Worker job, i, obj = task 611*cda5da8dSAndroid Build Coastguard Worker try: 612*cda5da8dSAndroid Build Coastguard Worker cache[job]._set(i, obj) 613*cda5da8dSAndroid Build Coastguard Worker except KeyError: 614*cda5da8dSAndroid Build Coastguard Worker pass 615*cda5da8dSAndroid Build Coastguard Worker task = job = obj = None 616*cda5da8dSAndroid Build Coastguard Worker 617*cda5da8dSAndroid Build Coastguard Worker if hasattr(outqueue, '_reader'): 618*cda5da8dSAndroid Build Coastguard Worker util.debug('ensuring that outqueue is not full') 619*cda5da8dSAndroid Build Coastguard Worker # If we don't make room available in outqueue then 620*cda5da8dSAndroid Build Coastguard Worker # attempts to add the sentinel (None) to outqueue may 621*cda5da8dSAndroid Build Coastguard Worker # block. There is guaranteed to be no more than 2 sentinels. 622*cda5da8dSAndroid Build Coastguard Worker try: 623*cda5da8dSAndroid Build Coastguard Worker for i in range(10): 624*cda5da8dSAndroid Build Coastguard Worker if not outqueue._reader.poll(): 625*cda5da8dSAndroid Build Coastguard Worker break 626*cda5da8dSAndroid Build Coastguard Worker get() 627*cda5da8dSAndroid Build Coastguard Worker except (OSError, EOFError): 628*cda5da8dSAndroid Build Coastguard Worker pass 629*cda5da8dSAndroid Build Coastguard Worker 630*cda5da8dSAndroid Build Coastguard Worker util.debug('result handler exiting: len(cache)=%s, thread._state=%s', 631*cda5da8dSAndroid Build Coastguard Worker len(cache), thread._state) 632*cda5da8dSAndroid Build Coastguard Worker 633*cda5da8dSAndroid Build Coastguard Worker @staticmethod 634*cda5da8dSAndroid Build Coastguard Worker def _get_tasks(func, it, size): 635*cda5da8dSAndroid Build Coastguard Worker it = iter(it) 636*cda5da8dSAndroid Build Coastguard Worker while 1: 637*cda5da8dSAndroid Build Coastguard Worker x = tuple(itertools.islice(it, size)) 638*cda5da8dSAndroid Build Coastguard Worker if not x: 639*cda5da8dSAndroid Build Coastguard Worker return 640*cda5da8dSAndroid Build Coastguard Worker yield (func, x) 641*cda5da8dSAndroid Build Coastguard Worker 642*cda5da8dSAndroid Build Coastguard Worker def __reduce__(self): 643*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError( 644*cda5da8dSAndroid Build Coastguard Worker 'pool objects cannot be passed between processes or pickled' 645*cda5da8dSAndroid Build Coastguard Worker ) 646*cda5da8dSAndroid Build Coastguard Worker 647*cda5da8dSAndroid Build Coastguard Worker def close(self): 648*cda5da8dSAndroid Build Coastguard Worker util.debug('closing pool') 649*cda5da8dSAndroid Build Coastguard Worker if self._state == RUN: 650*cda5da8dSAndroid Build Coastguard Worker self._state = CLOSE 651*cda5da8dSAndroid Build Coastguard Worker self._worker_handler._state = CLOSE 652*cda5da8dSAndroid Build Coastguard Worker self._change_notifier.put(None) 653*cda5da8dSAndroid Build Coastguard Worker 654*cda5da8dSAndroid Build Coastguard Worker def terminate(self): 655*cda5da8dSAndroid Build Coastguard Worker util.debug('terminating pool') 656*cda5da8dSAndroid Build Coastguard Worker self._state = TERMINATE 657*cda5da8dSAndroid Build Coastguard Worker self._terminate() 658*cda5da8dSAndroid Build Coastguard Worker 659*cda5da8dSAndroid Build Coastguard Worker def join(self): 660*cda5da8dSAndroid Build Coastguard Worker util.debug('joining pool') 661*cda5da8dSAndroid Build Coastguard Worker if self._state == RUN: 662*cda5da8dSAndroid Build Coastguard Worker raise ValueError("Pool is still running") 663*cda5da8dSAndroid Build Coastguard Worker elif self._state not in (CLOSE, TERMINATE): 664*cda5da8dSAndroid Build Coastguard Worker raise ValueError("In unknown state") 665*cda5da8dSAndroid Build Coastguard Worker self._worker_handler.join() 666*cda5da8dSAndroid Build Coastguard Worker self._task_handler.join() 667*cda5da8dSAndroid Build Coastguard Worker self._result_handler.join() 668*cda5da8dSAndroid Build Coastguard Worker for p in self._pool: 669*cda5da8dSAndroid Build Coastguard Worker p.join() 670*cda5da8dSAndroid Build Coastguard Worker 671*cda5da8dSAndroid Build Coastguard Worker @staticmethod 672*cda5da8dSAndroid Build Coastguard Worker def _help_stuff_finish(inqueue, task_handler, size): 673*cda5da8dSAndroid Build Coastguard Worker # task_handler may be blocked trying to put items on inqueue 674*cda5da8dSAndroid Build Coastguard Worker util.debug('removing tasks from inqueue until task handler finished') 675*cda5da8dSAndroid Build Coastguard Worker inqueue._rlock.acquire() 676*cda5da8dSAndroid Build Coastguard Worker while task_handler.is_alive() and inqueue._reader.poll(): 677*cda5da8dSAndroid Build Coastguard Worker inqueue._reader.recv() 678*cda5da8dSAndroid Build Coastguard Worker time.sleep(0) 679*cda5da8dSAndroid Build Coastguard Worker 680*cda5da8dSAndroid Build Coastguard Worker @classmethod 681*cda5da8dSAndroid Build Coastguard Worker def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, 682*cda5da8dSAndroid Build Coastguard Worker worker_handler, task_handler, result_handler, cache): 683*cda5da8dSAndroid Build Coastguard Worker # this is guaranteed to only be called once 684*cda5da8dSAndroid Build Coastguard Worker util.debug('finalizing pool') 685*cda5da8dSAndroid Build Coastguard Worker 686*cda5da8dSAndroid Build Coastguard Worker # Notify that the worker_handler state has been changed so the 687*cda5da8dSAndroid Build Coastguard Worker # _handle_workers loop can be unblocked (and exited) in order to 688*cda5da8dSAndroid Build Coastguard Worker # send the finalization sentinel all the workers. 689*cda5da8dSAndroid Build Coastguard Worker worker_handler._state = TERMINATE 690*cda5da8dSAndroid Build Coastguard Worker change_notifier.put(None) 691*cda5da8dSAndroid Build Coastguard Worker 692*cda5da8dSAndroid Build Coastguard Worker task_handler._state = TERMINATE 693*cda5da8dSAndroid Build Coastguard Worker 694*cda5da8dSAndroid Build Coastguard Worker util.debug('helping task handler/workers to finish') 695*cda5da8dSAndroid Build Coastguard Worker cls._help_stuff_finish(inqueue, task_handler, len(pool)) 696*cda5da8dSAndroid Build Coastguard Worker 697*cda5da8dSAndroid Build Coastguard Worker if (not result_handler.is_alive()) and (len(cache) != 0): 698*cda5da8dSAndroid Build Coastguard Worker raise AssertionError( 699*cda5da8dSAndroid Build Coastguard Worker "Cannot have cache with result_hander not alive") 700*cda5da8dSAndroid Build Coastguard Worker 701*cda5da8dSAndroid Build Coastguard Worker result_handler._state = TERMINATE 702*cda5da8dSAndroid Build Coastguard Worker change_notifier.put(None) 703*cda5da8dSAndroid Build Coastguard Worker outqueue.put(None) # sentinel 704*cda5da8dSAndroid Build Coastguard Worker 705*cda5da8dSAndroid Build Coastguard Worker # We must wait for the worker handler to exit before terminating 706*cda5da8dSAndroid Build Coastguard Worker # workers because we don't want workers to be restarted behind our back. 707*cda5da8dSAndroid Build Coastguard Worker util.debug('joining worker handler') 708*cda5da8dSAndroid Build Coastguard Worker if threading.current_thread() is not worker_handler: 709*cda5da8dSAndroid Build Coastguard Worker worker_handler.join() 710*cda5da8dSAndroid Build Coastguard Worker 711*cda5da8dSAndroid Build Coastguard Worker # Terminate workers which haven't already finished. 712*cda5da8dSAndroid Build Coastguard Worker if pool and hasattr(pool[0], 'terminate'): 713*cda5da8dSAndroid Build Coastguard Worker util.debug('terminating workers') 714*cda5da8dSAndroid Build Coastguard Worker for p in pool: 715*cda5da8dSAndroid Build Coastguard Worker if p.exitcode is None: 716*cda5da8dSAndroid Build Coastguard Worker p.terminate() 717*cda5da8dSAndroid Build Coastguard Worker 718*cda5da8dSAndroid Build Coastguard Worker util.debug('joining task handler') 719*cda5da8dSAndroid Build Coastguard Worker if threading.current_thread() is not task_handler: 720*cda5da8dSAndroid Build Coastguard Worker task_handler.join() 721*cda5da8dSAndroid Build Coastguard Worker 722*cda5da8dSAndroid Build Coastguard Worker util.debug('joining result handler') 723*cda5da8dSAndroid Build Coastguard Worker if threading.current_thread() is not result_handler: 724*cda5da8dSAndroid Build Coastguard Worker result_handler.join() 725*cda5da8dSAndroid Build Coastguard Worker 726*cda5da8dSAndroid Build Coastguard Worker if pool and hasattr(pool[0], 'terminate'): 727*cda5da8dSAndroid Build Coastguard Worker util.debug('joining pool workers') 728*cda5da8dSAndroid Build Coastguard Worker for p in pool: 729*cda5da8dSAndroid Build Coastguard Worker if p.is_alive(): 730*cda5da8dSAndroid Build Coastguard Worker # worker has not yet exited 731*cda5da8dSAndroid Build Coastguard Worker util.debug('cleaning up worker %d' % p.pid) 732*cda5da8dSAndroid Build Coastguard Worker p.join() 733*cda5da8dSAndroid Build Coastguard Worker 734*cda5da8dSAndroid Build Coastguard Worker def __enter__(self): 735*cda5da8dSAndroid Build Coastguard Worker self._check_running() 736*cda5da8dSAndroid Build Coastguard Worker return self 737*cda5da8dSAndroid Build Coastguard Worker 738*cda5da8dSAndroid Build Coastguard Worker def __exit__(self, exc_type, exc_val, exc_tb): 739*cda5da8dSAndroid Build Coastguard Worker self.terminate() 740*cda5da8dSAndroid Build Coastguard Worker 741*cda5da8dSAndroid Build Coastguard Worker# 742*cda5da8dSAndroid Build Coastguard Worker# Class whose instances are returned by `Pool.apply_async()` 743*cda5da8dSAndroid Build Coastguard Worker# 744*cda5da8dSAndroid Build Coastguard Worker 745*cda5da8dSAndroid Build Coastguard Workerclass ApplyResult(object): 746*cda5da8dSAndroid Build Coastguard Worker 747*cda5da8dSAndroid Build Coastguard Worker def __init__(self, pool, callback, error_callback): 748*cda5da8dSAndroid Build Coastguard Worker self._pool = pool 749*cda5da8dSAndroid Build Coastguard Worker self._event = threading.Event() 750*cda5da8dSAndroid Build Coastguard Worker self._job = next(job_counter) 751*cda5da8dSAndroid Build Coastguard Worker self._cache = pool._cache 752*cda5da8dSAndroid Build Coastguard Worker self._callback = callback 753*cda5da8dSAndroid Build Coastguard Worker self._error_callback = error_callback 754*cda5da8dSAndroid Build Coastguard Worker self._cache[self._job] = self 755*cda5da8dSAndroid Build Coastguard Worker 756*cda5da8dSAndroid Build Coastguard Worker def ready(self): 757*cda5da8dSAndroid Build Coastguard Worker return self._event.is_set() 758*cda5da8dSAndroid Build Coastguard Worker 759*cda5da8dSAndroid Build Coastguard Worker def successful(self): 760*cda5da8dSAndroid Build Coastguard Worker if not self.ready(): 761*cda5da8dSAndroid Build Coastguard Worker raise ValueError("{0!r} not ready".format(self)) 762*cda5da8dSAndroid Build Coastguard Worker return self._success 763*cda5da8dSAndroid Build Coastguard Worker 764*cda5da8dSAndroid Build Coastguard Worker def wait(self, timeout=None): 765*cda5da8dSAndroid Build Coastguard Worker self._event.wait(timeout) 766*cda5da8dSAndroid Build Coastguard Worker 767*cda5da8dSAndroid Build Coastguard Worker def get(self, timeout=None): 768*cda5da8dSAndroid Build Coastguard Worker self.wait(timeout) 769*cda5da8dSAndroid Build Coastguard Worker if not self.ready(): 770*cda5da8dSAndroid Build Coastguard Worker raise TimeoutError 771*cda5da8dSAndroid Build Coastguard Worker if self._success: 772*cda5da8dSAndroid Build Coastguard Worker return self._value 773*cda5da8dSAndroid Build Coastguard Worker else: 774*cda5da8dSAndroid Build Coastguard Worker raise self._value 775*cda5da8dSAndroid Build Coastguard Worker 776*cda5da8dSAndroid Build Coastguard Worker def _set(self, i, obj): 777*cda5da8dSAndroid Build Coastguard Worker self._success, self._value = obj 778*cda5da8dSAndroid Build Coastguard Worker if self._callback and self._success: 779*cda5da8dSAndroid Build Coastguard Worker self._callback(self._value) 780*cda5da8dSAndroid Build Coastguard Worker if self._error_callback and not self._success: 781*cda5da8dSAndroid Build Coastguard Worker self._error_callback(self._value) 782*cda5da8dSAndroid Build Coastguard Worker self._event.set() 783*cda5da8dSAndroid Build Coastguard Worker del self._cache[self._job] 784*cda5da8dSAndroid Build Coastguard Worker self._pool = None 785*cda5da8dSAndroid Build Coastguard Worker 786*cda5da8dSAndroid Build Coastguard Worker __class_getitem__ = classmethod(types.GenericAlias) 787*cda5da8dSAndroid Build Coastguard Worker 788*cda5da8dSAndroid Build Coastguard WorkerAsyncResult = ApplyResult # create alias -- see #17805 789*cda5da8dSAndroid Build Coastguard Worker 790*cda5da8dSAndroid Build Coastguard Worker# 791*cda5da8dSAndroid Build Coastguard Worker# Class whose instances are returned by `Pool.map_async()` 792*cda5da8dSAndroid Build Coastguard Worker# 793*cda5da8dSAndroid Build Coastguard Worker 794*cda5da8dSAndroid Build Coastguard Workerclass MapResult(ApplyResult): 795*cda5da8dSAndroid Build Coastguard Worker 796*cda5da8dSAndroid Build Coastguard Worker def __init__(self, pool, chunksize, length, callback, error_callback): 797*cda5da8dSAndroid Build Coastguard Worker ApplyResult.__init__(self, pool, callback, 798*cda5da8dSAndroid Build Coastguard Worker error_callback=error_callback) 799*cda5da8dSAndroid Build Coastguard Worker self._success = True 800*cda5da8dSAndroid Build Coastguard Worker self._value = [None] * length 801*cda5da8dSAndroid Build Coastguard Worker self._chunksize = chunksize 802*cda5da8dSAndroid Build Coastguard Worker if chunksize <= 0: 803*cda5da8dSAndroid Build Coastguard Worker self._number_left = 0 804*cda5da8dSAndroid Build Coastguard Worker self._event.set() 805*cda5da8dSAndroid Build Coastguard Worker del self._cache[self._job] 806*cda5da8dSAndroid Build Coastguard Worker else: 807*cda5da8dSAndroid Build Coastguard Worker self._number_left = length//chunksize + bool(length % chunksize) 808*cda5da8dSAndroid Build Coastguard Worker 809*cda5da8dSAndroid Build Coastguard Worker def _set(self, i, success_result): 810*cda5da8dSAndroid Build Coastguard Worker self._number_left -= 1 811*cda5da8dSAndroid Build Coastguard Worker success, result = success_result 812*cda5da8dSAndroid Build Coastguard Worker if success and self._success: 813*cda5da8dSAndroid Build Coastguard Worker self._value[i*self._chunksize:(i+1)*self._chunksize] = result 814*cda5da8dSAndroid Build Coastguard Worker if self._number_left == 0: 815*cda5da8dSAndroid Build Coastguard Worker if self._callback: 816*cda5da8dSAndroid Build Coastguard Worker self._callback(self._value) 817*cda5da8dSAndroid Build Coastguard Worker del self._cache[self._job] 818*cda5da8dSAndroid Build Coastguard Worker self._event.set() 819*cda5da8dSAndroid Build Coastguard Worker self._pool = None 820*cda5da8dSAndroid Build Coastguard Worker else: 821*cda5da8dSAndroid Build Coastguard Worker if not success and self._success: 822*cda5da8dSAndroid Build Coastguard Worker # only store first exception 823*cda5da8dSAndroid Build Coastguard Worker self._success = False 824*cda5da8dSAndroid Build Coastguard Worker self._value = result 825*cda5da8dSAndroid Build Coastguard Worker if self._number_left == 0: 826*cda5da8dSAndroid Build Coastguard Worker # only consider the result ready once all jobs are done 827*cda5da8dSAndroid Build Coastguard Worker if self._error_callback: 828*cda5da8dSAndroid Build Coastguard Worker self._error_callback(self._value) 829*cda5da8dSAndroid Build Coastguard Worker del self._cache[self._job] 830*cda5da8dSAndroid Build Coastguard Worker self._event.set() 831*cda5da8dSAndroid Build Coastguard Worker self._pool = None 832*cda5da8dSAndroid Build Coastguard Worker 833*cda5da8dSAndroid Build Coastguard Worker# 834*cda5da8dSAndroid Build Coastguard Worker# Class whose instances are returned by `Pool.imap()` 835*cda5da8dSAndroid Build Coastguard Worker# 836*cda5da8dSAndroid Build Coastguard Worker 837*cda5da8dSAndroid Build Coastguard Workerclass IMapIterator(object): 838*cda5da8dSAndroid Build Coastguard Worker 839*cda5da8dSAndroid Build Coastguard Worker def __init__(self, pool): 840*cda5da8dSAndroid Build Coastguard Worker self._pool = pool 841*cda5da8dSAndroid Build Coastguard Worker self._cond = threading.Condition(threading.Lock()) 842*cda5da8dSAndroid Build Coastguard Worker self._job = next(job_counter) 843*cda5da8dSAndroid Build Coastguard Worker self._cache = pool._cache 844*cda5da8dSAndroid Build Coastguard Worker self._items = collections.deque() 845*cda5da8dSAndroid Build Coastguard Worker self._index = 0 846*cda5da8dSAndroid Build Coastguard Worker self._length = None 847*cda5da8dSAndroid Build Coastguard Worker self._unsorted = {} 848*cda5da8dSAndroid Build Coastguard Worker self._cache[self._job] = self 849*cda5da8dSAndroid Build Coastguard Worker 850*cda5da8dSAndroid Build Coastguard Worker def __iter__(self): 851*cda5da8dSAndroid Build Coastguard Worker return self 852*cda5da8dSAndroid Build Coastguard Worker 853*cda5da8dSAndroid Build Coastguard Worker def next(self, timeout=None): 854*cda5da8dSAndroid Build Coastguard Worker with self._cond: 855*cda5da8dSAndroid Build Coastguard Worker try: 856*cda5da8dSAndroid Build Coastguard Worker item = self._items.popleft() 857*cda5da8dSAndroid Build Coastguard Worker except IndexError: 858*cda5da8dSAndroid Build Coastguard Worker if self._index == self._length: 859*cda5da8dSAndroid Build Coastguard Worker self._pool = None 860*cda5da8dSAndroid Build Coastguard Worker raise StopIteration from None 861*cda5da8dSAndroid Build Coastguard Worker self._cond.wait(timeout) 862*cda5da8dSAndroid Build Coastguard Worker try: 863*cda5da8dSAndroid Build Coastguard Worker item = self._items.popleft() 864*cda5da8dSAndroid Build Coastguard Worker except IndexError: 865*cda5da8dSAndroid Build Coastguard Worker if self._index == self._length: 866*cda5da8dSAndroid Build Coastguard Worker self._pool = None 867*cda5da8dSAndroid Build Coastguard Worker raise StopIteration from None 868*cda5da8dSAndroid Build Coastguard Worker raise TimeoutError from None 869*cda5da8dSAndroid Build Coastguard Worker 870*cda5da8dSAndroid Build Coastguard Worker success, value = item 871*cda5da8dSAndroid Build Coastguard Worker if success: 872*cda5da8dSAndroid Build Coastguard Worker return value 873*cda5da8dSAndroid Build Coastguard Worker raise value 874*cda5da8dSAndroid Build Coastguard Worker 875*cda5da8dSAndroid Build Coastguard Worker __next__ = next # XXX 876*cda5da8dSAndroid Build Coastguard Worker 877*cda5da8dSAndroid Build Coastguard Worker def _set(self, i, obj): 878*cda5da8dSAndroid Build Coastguard Worker with self._cond: 879*cda5da8dSAndroid Build Coastguard Worker if self._index == i: 880*cda5da8dSAndroid Build Coastguard Worker self._items.append(obj) 881*cda5da8dSAndroid Build Coastguard Worker self._index += 1 882*cda5da8dSAndroid Build Coastguard Worker while self._index in self._unsorted: 883*cda5da8dSAndroid Build Coastguard Worker obj = self._unsorted.pop(self._index) 884*cda5da8dSAndroid Build Coastguard Worker self._items.append(obj) 885*cda5da8dSAndroid Build Coastguard Worker self._index += 1 886*cda5da8dSAndroid Build Coastguard Worker self._cond.notify() 887*cda5da8dSAndroid Build Coastguard Worker else: 888*cda5da8dSAndroid Build Coastguard Worker self._unsorted[i] = obj 889*cda5da8dSAndroid Build Coastguard Worker 890*cda5da8dSAndroid Build Coastguard Worker if self._index == self._length: 891*cda5da8dSAndroid Build Coastguard Worker del self._cache[self._job] 892*cda5da8dSAndroid Build Coastguard Worker self._pool = None 893*cda5da8dSAndroid Build Coastguard Worker 894*cda5da8dSAndroid Build Coastguard Worker def _set_length(self, length): 895*cda5da8dSAndroid Build Coastguard Worker with self._cond: 896*cda5da8dSAndroid Build Coastguard Worker self._length = length 897*cda5da8dSAndroid Build Coastguard Worker if self._index == self._length: 898*cda5da8dSAndroid Build Coastguard Worker self._cond.notify() 899*cda5da8dSAndroid Build Coastguard Worker del self._cache[self._job] 900*cda5da8dSAndroid Build Coastguard Worker self._pool = None 901*cda5da8dSAndroid Build Coastguard Worker 902*cda5da8dSAndroid Build Coastguard Worker# 903*cda5da8dSAndroid Build Coastguard Worker# Class whose instances are returned by `Pool.imap_unordered()` 904*cda5da8dSAndroid Build Coastguard Worker# 905*cda5da8dSAndroid Build Coastguard Worker 906*cda5da8dSAndroid Build Coastguard Workerclass IMapUnorderedIterator(IMapIterator): 907*cda5da8dSAndroid Build Coastguard Worker 908*cda5da8dSAndroid Build Coastguard Worker def _set(self, i, obj): 909*cda5da8dSAndroid Build Coastguard Worker with self._cond: 910*cda5da8dSAndroid Build Coastguard Worker self._items.append(obj) 911*cda5da8dSAndroid Build Coastguard Worker self._index += 1 912*cda5da8dSAndroid Build Coastguard Worker self._cond.notify() 913*cda5da8dSAndroid Build Coastguard Worker if self._index == self._length: 914*cda5da8dSAndroid Build Coastguard Worker del self._cache[self._job] 915*cda5da8dSAndroid Build Coastguard Worker self._pool = None 916*cda5da8dSAndroid Build Coastguard Worker 917*cda5da8dSAndroid Build Coastguard Worker# 918*cda5da8dSAndroid Build Coastguard Worker# 919*cda5da8dSAndroid Build Coastguard Worker# 920*cda5da8dSAndroid Build Coastguard Worker 921*cda5da8dSAndroid Build Coastguard Workerclass ThreadPool(Pool): 922*cda5da8dSAndroid Build Coastguard Worker _wrap_exception = False 923*cda5da8dSAndroid Build Coastguard Worker 924*cda5da8dSAndroid Build Coastguard Worker @staticmethod 925*cda5da8dSAndroid Build Coastguard Worker def Process(ctx, *args, **kwds): 926*cda5da8dSAndroid Build Coastguard Worker from .dummy import Process 927*cda5da8dSAndroid Build Coastguard Worker return Process(*args, **kwds) 928*cda5da8dSAndroid Build Coastguard Worker 929*cda5da8dSAndroid Build Coastguard Worker def __init__(self, processes=None, initializer=None, initargs=()): 930*cda5da8dSAndroid Build Coastguard Worker Pool.__init__(self, processes, initializer, initargs) 931*cda5da8dSAndroid Build Coastguard Worker 932*cda5da8dSAndroid Build Coastguard Worker def _setup_queues(self): 933*cda5da8dSAndroid Build Coastguard Worker self._inqueue = queue.SimpleQueue() 934*cda5da8dSAndroid Build Coastguard Worker self._outqueue = queue.SimpleQueue() 935*cda5da8dSAndroid Build Coastguard Worker self._quick_put = self._inqueue.put 936*cda5da8dSAndroid Build Coastguard Worker self._quick_get = self._outqueue.get 937*cda5da8dSAndroid Build Coastguard Worker 938*cda5da8dSAndroid Build Coastguard Worker def _get_sentinels(self): 939*cda5da8dSAndroid Build Coastguard Worker return [self._change_notifier._reader] 940*cda5da8dSAndroid Build Coastguard Worker 941*cda5da8dSAndroid Build Coastguard Worker @staticmethod 942*cda5da8dSAndroid Build Coastguard Worker def _get_worker_sentinels(workers): 943*cda5da8dSAndroid Build Coastguard Worker return [] 944*cda5da8dSAndroid Build Coastguard Worker 945*cda5da8dSAndroid Build Coastguard Worker @staticmethod 946*cda5da8dSAndroid Build Coastguard Worker def _help_stuff_finish(inqueue, task_handler, size): 947*cda5da8dSAndroid Build Coastguard Worker # drain inqueue, and put sentinels at its head to make workers finish 948*cda5da8dSAndroid Build Coastguard Worker try: 949*cda5da8dSAndroid Build Coastguard Worker while True: 950*cda5da8dSAndroid Build Coastguard Worker inqueue.get(block=False) 951*cda5da8dSAndroid Build Coastguard Worker except queue.Empty: 952*cda5da8dSAndroid Build Coastguard Worker pass 953*cda5da8dSAndroid Build Coastguard Worker for i in range(size): 954*cda5da8dSAndroid Build Coastguard Worker inqueue.put(None) 955*cda5da8dSAndroid Build Coastguard Worker 956*cda5da8dSAndroid Build Coastguard Worker def _wait_for_updates(self, sentinels, change_notifier, timeout): 957*cda5da8dSAndroid Build Coastguard Worker time.sleep(timeout) 958