1*cda5da8dSAndroid Build Coastguard Worker# Copyright 2009 Brian Quinlan. All Rights Reserved. 2*cda5da8dSAndroid Build Coastguard Worker# Licensed to PSF under a Contributor Agreement. 3*cda5da8dSAndroid Build Coastguard Worker 4*cda5da8dSAndroid Build Coastguard Worker"""Implements ProcessPoolExecutor. 5*cda5da8dSAndroid Build Coastguard Worker 6*cda5da8dSAndroid Build Coastguard WorkerThe following diagram and text describe the data-flow through the system: 7*cda5da8dSAndroid Build Coastguard Worker 8*cda5da8dSAndroid Build Coastguard Worker|======================= In-process =====================|== Out-of-process ==| 9*cda5da8dSAndroid Build Coastguard Worker 10*cda5da8dSAndroid Build Coastguard Worker+----------+ +----------+ +--------+ +-----------+ +---------+ 11*cda5da8dSAndroid Build Coastguard Worker| | => | Work Ids | | | | Call Q | | Process | 12*cda5da8dSAndroid Build Coastguard Worker| | +----------+ | | +-----------+ | Pool | 13*cda5da8dSAndroid Build Coastguard Worker| | | ... | | | | ... | +---------+ 14*cda5da8dSAndroid Build Coastguard Worker| | | 6 | => | | => | 5, call() | => | | 15*cda5da8dSAndroid Build Coastguard Worker| | | 7 | | | | ... | | | 16*cda5da8dSAndroid Build Coastguard Worker| Process | | ... | | Local | +-----------+ | Process | 17*cda5da8dSAndroid Build Coastguard Worker| Pool | +----------+ | Worker | | #1..n | 18*cda5da8dSAndroid Build Coastguard Worker| Executor | | Thread | | | 19*cda5da8dSAndroid Build Coastguard Worker| | +----------- + | | +-----------+ | | 20*cda5da8dSAndroid Build Coastguard Worker| | <=> | Work Items | <=> | | <= | Result Q | <= | | 21*cda5da8dSAndroid Build Coastguard Worker| | +------------+ | | +-----------+ | | 22*cda5da8dSAndroid Build Coastguard Worker| | | 6: call() | | | | ... | | | 23*cda5da8dSAndroid Build Coastguard Worker| | | future | | | | 4, result | | | 24*cda5da8dSAndroid Build Coastguard Worker| | | ... | | | | 3, except | | | 25*cda5da8dSAndroid Build Coastguard Worker+----------+ +------------+ +--------+ +-----------+ +---------+ 26*cda5da8dSAndroid Build Coastguard Worker 27*cda5da8dSAndroid Build Coastguard WorkerExecutor.submit() called: 28*cda5da8dSAndroid Build Coastguard Worker- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict 29*cda5da8dSAndroid Build Coastguard Worker- adds the id of the _WorkItem to the "Work Ids" queue 30*cda5da8dSAndroid Build Coastguard Worker 31*cda5da8dSAndroid Build Coastguard WorkerLocal worker thread: 32*cda5da8dSAndroid Build Coastguard Worker- reads work ids from the "Work Ids" queue and looks up the corresponding 33*cda5da8dSAndroid Build Coastguard Worker WorkItem from the "Work Items" dict: if the work item has been cancelled then 34*cda5da8dSAndroid Build Coastguard Worker it is simply removed from the dict, otherwise it is repackaged as a 35*cda5da8dSAndroid Build Coastguard Worker _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" 36*cda5da8dSAndroid Build Coastguard Worker until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because 37*cda5da8dSAndroid Build Coastguard Worker calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). 38*cda5da8dSAndroid Build Coastguard Worker- reads _ResultItems from "Result Q", updates the future stored in the 39*cda5da8dSAndroid Build Coastguard Worker "Work Items" dict and deletes the dict entry 40*cda5da8dSAndroid Build Coastguard Worker 41*cda5da8dSAndroid Build Coastguard WorkerProcess #1..n: 42*cda5da8dSAndroid Build Coastguard Worker- reads _CallItems from "Call Q", executes the calls, and puts the resulting 43*cda5da8dSAndroid Build Coastguard Worker _ResultItems in "Result Q" 44*cda5da8dSAndroid Build Coastguard Worker""" 45*cda5da8dSAndroid Build Coastguard Worker 46*cda5da8dSAndroid Build Coastguard Worker__author__ = 'Brian Quinlan ([email protected])' 47*cda5da8dSAndroid Build Coastguard Worker 48*cda5da8dSAndroid Build Coastguard Workerimport os 49*cda5da8dSAndroid Build Coastguard Workerfrom concurrent.futures import _base 50*cda5da8dSAndroid Build Coastguard Workerimport queue 51*cda5da8dSAndroid Build Coastguard Workerimport multiprocessing as mp 52*cda5da8dSAndroid Build Coastguard Workerimport multiprocessing.connection 53*cda5da8dSAndroid Build Coastguard Workerfrom multiprocessing.queues import Queue 54*cda5da8dSAndroid Build Coastguard Workerimport threading 55*cda5da8dSAndroid Build Coastguard Workerimport weakref 56*cda5da8dSAndroid Build Coastguard Workerfrom functools import partial 57*cda5da8dSAndroid Build Coastguard Workerimport itertools 58*cda5da8dSAndroid Build Coastguard Workerimport sys 59*cda5da8dSAndroid Build Coastguard Workerfrom traceback import format_exception 60*cda5da8dSAndroid Build Coastguard Worker 61*cda5da8dSAndroid Build Coastguard Worker 62*cda5da8dSAndroid Build Coastguard Worker_threads_wakeups = weakref.WeakKeyDictionary() 63*cda5da8dSAndroid Build Coastguard Worker_global_shutdown = False 64*cda5da8dSAndroid Build Coastguard Worker 65*cda5da8dSAndroid Build Coastguard Worker 66*cda5da8dSAndroid Build Coastguard Workerclass _ThreadWakeup: 67*cda5da8dSAndroid Build Coastguard Worker def __init__(self): 68*cda5da8dSAndroid Build Coastguard Worker self._closed = False 69*cda5da8dSAndroid Build Coastguard Worker self._reader, self._writer = mp.Pipe(duplex=False) 70*cda5da8dSAndroid Build Coastguard Worker 71*cda5da8dSAndroid Build Coastguard Worker def close(self): 72*cda5da8dSAndroid Build Coastguard Worker if not self._closed: 73*cda5da8dSAndroid Build Coastguard Worker self._closed = True 74*cda5da8dSAndroid Build Coastguard Worker self._writer.close() 75*cda5da8dSAndroid Build Coastguard Worker self._reader.close() 76*cda5da8dSAndroid Build Coastguard Worker 77*cda5da8dSAndroid Build Coastguard Worker def wakeup(self): 78*cda5da8dSAndroid Build Coastguard Worker if not self._closed: 79*cda5da8dSAndroid Build Coastguard Worker self._writer.send_bytes(b"") 80*cda5da8dSAndroid Build Coastguard Worker 81*cda5da8dSAndroid Build Coastguard Worker def clear(self): 82*cda5da8dSAndroid Build Coastguard Worker if not self._closed: 83*cda5da8dSAndroid Build Coastguard Worker while self._reader.poll(): 84*cda5da8dSAndroid Build Coastguard Worker self._reader.recv_bytes() 85*cda5da8dSAndroid Build Coastguard Worker 86*cda5da8dSAndroid Build Coastguard Worker 87*cda5da8dSAndroid Build Coastguard Workerdef _python_exit(): 88*cda5da8dSAndroid Build Coastguard Worker global _global_shutdown 89*cda5da8dSAndroid Build Coastguard Worker _global_shutdown = True 90*cda5da8dSAndroid Build Coastguard Worker items = list(_threads_wakeups.items()) 91*cda5da8dSAndroid Build Coastguard Worker for _, thread_wakeup in items: 92*cda5da8dSAndroid Build Coastguard Worker # call not protected by ProcessPoolExecutor._shutdown_lock 93*cda5da8dSAndroid Build Coastguard Worker thread_wakeup.wakeup() 94*cda5da8dSAndroid Build Coastguard Worker for t, _ in items: 95*cda5da8dSAndroid Build Coastguard Worker t.join() 96*cda5da8dSAndroid Build Coastguard Worker 97*cda5da8dSAndroid Build Coastguard Worker# Register for `_python_exit()` to be called just before joining all 98*cda5da8dSAndroid Build Coastguard Worker# non-daemon threads. This is used instead of `atexit.register()` for 99*cda5da8dSAndroid Build Coastguard Worker# compatibility with subinterpreters, which no longer support daemon threads. 100*cda5da8dSAndroid Build Coastguard Worker# See bpo-39812 for context. 101*cda5da8dSAndroid Build Coastguard Workerthreading._register_atexit(_python_exit) 102*cda5da8dSAndroid Build Coastguard Worker 103*cda5da8dSAndroid Build Coastguard Worker# Controls how many more calls than processes will be queued in the call queue. 104*cda5da8dSAndroid Build Coastguard Worker# A smaller number will mean that processes spend more time idle waiting for 105*cda5da8dSAndroid Build Coastguard Worker# work while a larger number will make Future.cancel() succeed less frequently 106*cda5da8dSAndroid Build Coastguard Worker# (Futures in the call queue cannot be cancelled). 107*cda5da8dSAndroid Build Coastguard WorkerEXTRA_QUEUED_CALLS = 1 108*cda5da8dSAndroid Build Coastguard Worker 109*cda5da8dSAndroid Build Coastguard Worker 110*cda5da8dSAndroid Build Coastguard Worker# On Windows, WaitForMultipleObjects is used to wait for processes to finish. 111*cda5da8dSAndroid Build Coastguard Worker# It can wait on, at most, 63 objects. There is an overhead of two objects: 112*cda5da8dSAndroid Build Coastguard Worker# - the result queue reader 113*cda5da8dSAndroid Build Coastguard Worker# - the thread wakeup reader 114*cda5da8dSAndroid Build Coastguard Worker_MAX_WINDOWS_WORKERS = 63 - 2 115*cda5da8dSAndroid Build Coastguard Worker 116*cda5da8dSAndroid Build Coastguard Worker# Hack to embed stringification of remote traceback in local traceback 117*cda5da8dSAndroid Build Coastguard Worker 118*cda5da8dSAndroid Build Coastguard Workerclass _RemoteTraceback(Exception): 119*cda5da8dSAndroid Build Coastguard Worker def __init__(self, tb): 120*cda5da8dSAndroid Build Coastguard Worker self.tb = tb 121*cda5da8dSAndroid Build Coastguard Worker def __str__(self): 122*cda5da8dSAndroid Build Coastguard Worker return self.tb 123*cda5da8dSAndroid Build Coastguard Worker 124*cda5da8dSAndroid Build Coastguard Workerclass _ExceptionWithTraceback: 125*cda5da8dSAndroid Build Coastguard Worker def __init__(self, exc, tb): 126*cda5da8dSAndroid Build Coastguard Worker tb = ''.join(format_exception(type(exc), exc, tb)) 127*cda5da8dSAndroid Build Coastguard Worker self.exc = exc 128*cda5da8dSAndroid Build Coastguard Worker # Traceback object needs to be garbage-collected as its frames 129*cda5da8dSAndroid Build Coastguard Worker # contain references to all the objects in the exception scope 130*cda5da8dSAndroid Build Coastguard Worker self.exc.__traceback__ = None 131*cda5da8dSAndroid Build Coastguard Worker self.tb = '\n"""\n%s"""' % tb 132*cda5da8dSAndroid Build Coastguard Worker def __reduce__(self): 133*cda5da8dSAndroid Build Coastguard Worker return _rebuild_exc, (self.exc, self.tb) 134*cda5da8dSAndroid Build Coastguard Worker 135*cda5da8dSAndroid Build Coastguard Workerdef _rebuild_exc(exc, tb): 136*cda5da8dSAndroid Build Coastguard Worker exc.__cause__ = _RemoteTraceback(tb) 137*cda5da8dSAndroid Build Coastguard Worker return exc 138*cda5da8dSAndroid Build Coastguard Worker 139*cda5da8dSAndroid Build Coastguard Workerclass _WorkItem(object): 140*cda5da8dSAndroid Build Coastguard Worker def __init__(self, future, fn, args, kwargs): 141*cda5da8dSAndroid Build Coastguard Worker self.future = future 142*cda5da8dSAndroid Build Coastguard Worker self.fn = fn 143*cda5da8dSAndroid Build Coastguard Worker self.args = args 144*cda5da8dSAndroid Build Coastguard Worker self.kwargs = kwargs 145*cda5da8dSAndroid Build Coastguard Worker 146*cda5da8dSAndroid Build Coastguard Workerclass _ResultItem(object): 147*cda5da8dSAndroid Build Coastguard Worker def __init__(self, work_id, exception=None, result=None, exit_pid=None): 148*cda5da8dSAndroid Build Coastguard Worker self.work_id = work_id 149*cda5da8dSAndroid Build Coastguard Worker self.exception = exception 150*cda5da8dSAndroid Build Coastguard Worker self.result = result 151*cda5da8dSAndroid Build Coastguard Worker self.exit_pid = exit_pid 152*cda5da8dSAndroid Build Coastguard Worker 153*cda5da8dSAndroid Build Coastguard Workerclass _CallItem(object): 154*cda5da8dSAndroid Build Coastguard Worker def __init__(self, work_id, fn, args, kwargs): 155*cda5da8dSAndroid Build Coastguard Worker self.work_id = work_id 156*cda5da8dSAndroid Build Coastguard Worker self.fn = fn 157*cda5da8dSAndroid Build Coastguard Worker self.args = args 158*cda5da8dSAndroid Build Coastguard Worker self.kwargs = kwargs 159*cda5da8dSAndroid Build Coastguard Worker 160*cda5da8dSAndroid Build Coastguard Worker 161*cda5da8dSAndroid Build Coastguard Workerclass _SafeQueue(Queue): 162*cda5da8dSAndroid Build Coastguard Worker """Safe Queue set exception to the future object linked to a job""" 163*cda5da8dSAndroid Build Coastguard Worker def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, 164*cda5da8dSAndroid Build Coastguard Worker thread_wakeup): 165*cda5da8dSAndroid Build Coastguard Worker self.pending_work_items = pending_work_items 166*cda5da8dSAndroid Build Coastguard Worker self.shutdown_lock = shutdown_lock 167*cda5da8dSAndroid Build Coastguard Worker self.thread_wakeup = thread_wakeup 168*cda5da8dSAndroid Build Coastguard Worker super().__init__(max_size, ctx=ctx) 169*cda5da8dSAndroid Build Coastguard Worker 170*cda5da8dSAndroid Build Coastguard Worker def _on_queue_feeder_error(self, e, obj): 171*cda5da8dSAndroid Build Coastguard Worker if isinstance(obj, _CallItem): 172*cda5da8dSAndroid Build Coastguard Worker tb = format_exception(type(e), e, e.__traceback__) 173*cda5da8dSAndroid Build Coastguard Worker e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) 174*cda5da8dSAndroid Build Coastguard Worker work_item = self.pending_work_items.pop(obj.work_id, None) 175*cda5da8dSAndroid Build Coastguard Worker with self.shutdown_lock: 176*cda5da8dSAndroid Build Coastguard Worker self.thread_wakeup.wakeup() 177*cda5da8dSAndroid Build Coastguard Worker # work_item can be None if another process terminated. In this 178*cda5da8dSAndroid Build Coastguard Worker # case, the executor_manager_thread fails all work_items 179*cda5da8dSAndroid Build Coastguard Worker # with BrokenProcessPool 180*cda5da8dSAndroid Build Coastguard Worker if work_item is not None: 181*cda5da8dSAndroid Build Coastguard Worker work_item.future.set_exception(e) 182*cda5da8dSAndroid Build Coastguard Worker else: 183*cda5da8dSAndroid Build Coastguard Worker super()._on_queue_feeder_error(e, obj) 184*cda5da8dSAndroid Build Coastguard Worker 185*cda5da8dSAndroid Build Coastguard Worker 186*cda5da8dSAndroid Build Coastguard Workerdef _get_chunks(*iterables, chunksize): 187*cda5da8dSAndroid Build Coastguard Worker """ Iterates over zip()ed iterables in chunks. """ 188*cda5da8dSAndroid Build Coastguard Worker it = zip(*iterables) 189*cda5da8dSAndroid Build Coastguard Worker while True: 190*cda5da8dSAndroid Build Coastguard Worker chunk = tuple(itertools.islice(it, chunksize)) 191*cda5da8dSAndroid Build Coastguard Worker if not chunk: 192*cda5da8dSAndroid Build Coastguard Worker return 193*cda5da8dSAndroid Build Coastguard Worker yield chunk 194*cda5da8dSAndroid Build Coastguard Worker 195*cda5da8dSAndroid Build Coastguard Worker 196*cda5da8dSAndroid Build Coastguard Workerdef _process_chunk(fn, chunk): 197*cda5da8dSAndroid Build Coastguard Worker """ Processes a chunk of an iterable passed to map. 198*cda5da8dSAndroid Build Coastguard Worker 199*cda5da8dSAndroid Build Coastguard Worker Runs the function passed to map() on a chunk of the 200*cda5da8dSAndroid Build Coastguard Worker iterable passed to map. 201*cda5da8dSAndroid Build Coastguard Worker 202*cda5da8dSAndroid Build Coastguard Worker This function is run in a separate process. 203*cda5da8dSAndroid Build Coastguard Worker 204*cda5da8dSAndroid Build Coastguard Worker """ 205*cda5da8dSAndroid Build Coastguard Worker return [fn(*args) for args in chunk] 206*cda5da8dSAndroid Build Coastguard Worker 207*cda5da8dSAndroid Build Coastguard Worker 208*cda5da8dSAndroid Build Coastguard Workerdef _sendback_result(result_queue, work_id, result=None, exception=None, 209*cda5da8dSAndroid Build Coastguard Worker exit_pid=None): 210*cda5da8dSAndroid Build Coastguard Worker """Safely send back the given result or exception""" 211*cda5da8dSAndroid Build Coastguard Worker try: 212*cda5da8dSAndroid Build Coastguard Worker result_queue.put(_ResultItem(work_id, result=result, 213*cda5da8dSAndroid Build Coastguard Worker exception=exception, exit_pid=exit_pid)) 214*cda5da8dSAndroid Build Coastguard Worker except BaseException as e: 215*cda5da8dSAndroid Build Coastguard Worker exc = _ExceptionWithTraceback(e, e.__traceback__) 216*cda5da8dSAndroid Build Coastguard Worker result_queue.put(_ResultItem(work_id, exception=exc, 217*cda5da8dSAndroid Build Coastguard Worker exit_pid=exit_pid)) 218*cda5da8dSAndroid Build Coastguard Worker 219*cda5da8dSAndroid Build Coastguard Worker 220*cda5da8dSAndroid Build Coastguard Workerdef _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None): 221*cda5da8dSAndroid Build Coastguard Worker """Evaluates calls from call_queue and places the results in result_queue. 222*cda5da8dSAndroid Build Coastguard Worker 223*cda5da8dSAndroid Build Coastguard Worker This worker is run in a separate process. 224*cda5da8dSAndroid Build Coastguard Worker 225*cda5da8dSAndroid Build Coastguard Worker Args: 226*cda5da8dSAndroid Build Coastguard Worker call_queue: A ctx.Queue of _CallItems that will be read and 227*cda5da8dSAndroid Build Coastguard Worker evaluated by the worker. 228*cda5da8dSAndroid Build Coastguard Worker result_queue: A ctx.Queue of _ResultItems that will written 229*cda5da8dSAndroid Build Coastguard Worker to by the worker. 230*cda5da8dSAndroid Build Coastguard Worker initializer: A callable initializer, or None 231*cda5da8dSAndroid Build Coastguard Worker initargs: A tuple of args for the initializer 232*cda5da8dSAndroid Build Coastguard Worker """ 233*cda5da8dSAndroid Build Coastguard Worker if initializer is not None: 234*cda5da8dSAndroid Build Coastguard Worker try: 235*cda5da8dSAndroid Build Coastguard Worker initializer(*initargs) 236*cda5da8dSAndroid Build Coastguard Worker except BaseException: 237*cda5da8dSAndroid Build Coastguard Worker _base.LOGGER.critical('Exception in initializer:', exc_info=True) 238*cda5da8dSAndroid Build Coastguard Worker # The parent will notice that the process stopped and 239*cda5da8dSAndroid Build Coastguard Worker # mark the pool broken 240*cda5da8dSAndroid Build Coastguard Worker return 241*cda5da8dSAndroid Build Coastguard Worker num_tasks = 0 242*cda5da8dSAndroid Build Coastguard Worker exit_pid = None 243*cda5da8dSAndroid Build Coastguard Worker while True: 244*cda5da8dSAndroid Build Coastguard Worker call_item = call_queue.get(block=True) 245*cda5da8dSAndroid Build Coastguard Worker if call_item is None: 246*cda5da8dSAndroid Build Coastguard Worker # Wake up queue management thread 247*cda5da8dSAndroid Build Coastguard Worker result_queue.put(os.getpid()) 248*cda5da8dSAndroid Build Coastguard Worker return 249*cda5da8dSAndroid Build Coastguard Worker 250*cda5da8dSAndroid Build Coastguard Worker if max_tasks is not None: 251*cda5da8dSAndroid Build Coastguard Worker num_tasks += 1 252*cda5da8dSAndroid Build Coastguard Worker if num_tasks >= max_tasks: 253*cda5da8dSAndroid Build Coastguard Worker exit_pid = os.getpid() 254*cda5da8dSAndroid Build Coastguard Worker 255*cda5da8dSAndroid Build Coastguard Worker try: 256*cda5da8dSAndroid Build Coastguard Worker r = call_item.fn(*call_item.args, **call_item.kwargs) 257*cda5da8dSAndroid Build Coastguard Worker except BaseException as e: 258*cda5da8dSAndroid Build Coastguard Worker exc = _ExceptionWithTraceback(e, e.__traceback__) 259*cda5da8dSAndroid Build Coastguard Worker _sendback_result(result_queue, call_item.work_id, exception=exc, 260*cda5da8dSAndroid Build Coastguard Worker exit_pid=exit_pid) 261*cda5da8dSAndroid Build Coastguard Worker else: 262*cda5da8dSAndroid Build Coastguard Worker _sendback_result(result_queue, call_item.work_id, result=r, 263*cda5da8dSAndroid Build Coastguard Worker exit_pid=exit_pid) 264*cda5da8dSAndroid Build Coastguard Worker del r 265*cda5da8dSAndroid Build Coastguard Worker 266*cda5da8dSAndroid Build Coastguard Worker # Liberate the resource as soon as possible, to avoid holding onto 267*cda5da8dSAndroid Build Coastguard Worker # open files or shared memory that is not needed anymore 268*cda5da8dSAndroid Build Coastguard Worker del call_item 269*cda5da8dSAndroid Build Coastguard Worker 270*cda5da8dSAndroid Build Coastguard Worker if exit_pid is not None: 271*cda5da8dSAndroid Build Coastguard Worker return 272*cda5da8dSAndroid Build Coastguard Worker 273*cda5da8dSAndroid Build Coastguard Worker 274*cda5da8dSAndroid Build Coastguard Workerclass _ExecutorManagerThread(threading.Thread): 275*cda5da8dSAndroid Build Coastguard Worker """Manages the communication between this process and the worker processes. 276*cda5da8dSAndroid Build Coastguard Worker 277*cda5da8dSAndroid Build Coastguard Worker The manager is run in a local thread. 278*cda5da8dSAndroid Build Coastguard Worker 279*cda5da8dSAndroid Build Coastguard Worker Args: 280*cda5da8dSAndroid Build Coastguard Worker executor: A reference to the ProcessPoolExecutor that owns 281*cda5da8dSAndroid Build Coastguard Worker this thread. A weakref will be own by the manager as well as 282*cda5da8dSAndroid Build Coastguard Worker references to internal objects used to introspect the state of 283*cda5da8dSAndroid Build Coastguard Worker the executor. 284*cda5da8dSAndroid Build Coastguard Worker """ 285*cda5da8dSAndroid Build Coastguard Worker 286*cda5da8dSAndroid Build Coastguard Worker def __init__(self, executor): 287*cda5da8dSAndroid Build Coastguard Worker # Store references to necessary internals of the executor. 288*cda5da8dSAndroid Build Coastguard Worker 289*cda5da8dSAndroid Build Coastguard Worker # A _ThreadWakeup to allow waking up the queue_manager_thread from the 290*cda5da8dSAndroid Build Coastguard Worker # main Thread and avoid deadlocks caused by permanently locked queues. 291*cda5da8dSAndroid Build Coastguard Worker self.thread_wakeup = executor._executor_manager_thread_wakeup 292*cda5da8dSAndroid Build Coastguard Worker self.shutdown_lock = executor._shutdown_lock 293*cda5da8dSAndroid Build Coastguard Worker 294*cda5da8dSAndroid Build Coastguard Worker # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used 295*cda5da8dSAndroid Build Coastguard Worker # to determine if the ProcessPoolExecutor has been garbage collected 296*cda5da8dSAndroid Build Coastguard Worker # and that the manager can exit. 297*cda5da8dSAndroid Build Coastguard Worker # When the executor gets garbage collected, the weakref callback 298*cda5da8dSAndroid Build Coastguard Worker # will wake up the queue management thread so that it can terminate 299*cda5da8dSAndroid Build Coastguard Worker # if there is no pending work item. 300*cda5da8dSAndroid Build Coastguard Worker def weakref_cb(_, 301*cda5da8dSAndroid Build Coastguard Worker thread_wakeup=self.thread_wakeup, 302*cda5da8dSAndroid Build Coastguard Worker shutdown_lock=self.shutdown_lock): 303*cda5da8dSAndroid Build Coastguard Worker mp.util.debug('Executor collected: triggering callback for' 304*cda5da8dSAndroid Build Coastguard Worker ' QueueManager wakeup') 305*cda5da8dSAndroid Build Coastguard Worker with shutdown_lock: 306*cda5da8dSAndroid Build Coastguard Worker thread_wakeup.wakeup() 307*cda5da8dSAndroid Build Coastguard Worker 308*cda5da8dSAndroid Build Coastguard Worker self.executor_reference = weakref.ref(executor, weakref_cb) 309*cda5da8dSAndroid Build Coastguard Worker 310*cda5da8dSAndroid Build Coastguard Worker # A list of the ctx.Process instances used as workers. 311*cda5da8dSAndroid Build Coastguard Worker self.processes = executor._processes 312*cda5da8dSAndroid Build Coastguard Worker 313*cda5da8dSAndroid Build Coastguard Worker # A ctx.Queue that will be filled with _CallItems derived from 314*cda5da8dSAndroid Build Coastguard Worker # _WorkItems for processing by the process workers. 315*cda5da8dSAndroid Build Coastguard Worker self.call_queue = executor._call_queue 316*cda5da8dSAndroid Build Coastguard Worker 317*cda5da8dSAndroid Build Coastguard Worker # A ctx.SimpleQueue of _ResultItems generated by the process workers. 318*cda5da8dSAndroid Build Coastguard Worker self.result_queue = executor._result_queue 319*cda5da8dSAndroid Build Coastguard Worker 320*cda5da8dSAndroid Build Coastguard Worker # A queue.Queue of work ids e.g. Queue([5, 6, ...]). 321*cda5da8dSAndroid Build Coastguard Worker self.work_ids_queue = executor._work_ids 322*cda5da8dSAndroid Build Coastguard Worker 323*cda5da8dSAndroid Build Coastguard Worker # Maximum number of tasks a worker process can execute before 324*cda5da8dSAndroid Build Coastguard Worker # exiting safely 325*cda5da8dSAndroid Build Coastguard Worker self.max_tasks_per_child = executor._max_tasks_per_child 326*cda5da8dSAndroid Build Coastguard Worker 327*cda5da8dSAndroid Build Coastguard Worker # A dict mapping work ids to _WorkItems e.g. 328*cda5da8dSAndroid Build Coastguard Worker # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} 329*cda5da8dSAndroid Build Coastguard Worker self.pending_work_items = executor._pending_work_items 330*cda5da8dSAndroid Build Coastguard Worker 331*cda5da8dSAndroid Build Coastguard Worker super().__init__() 332*cda5da8dSAndroid Build Coastguard Worker 333*cda5da8dSAndroid Build Coastguard Worker def run(self): 334*cda5da8dSAndroid Build Coastguard Worker # Main loop for the executor manager thread. 335*cda5da8dSAndroid Build Coastguard Worker 336*cda5da8dSAndroid Build Coastguard Worker while True: 337*cda5da8dSAndroid Build Coastguard Worker self.add_call_item_to_queue() 338*cda5da8dSAndroid Build Coastguard Worker 339*cda5da8dSAndroid Build Coastguard Worker result_item, is_broken, cause = self.wait_result_broken_or_wakeup() 340*cda5da8dSAndroid Build Coastguard Worker 341*cda5da8dSAndroid Build Coastguard Worker if is_broken: 342*cda5da8dSAndroid Build Coastguard Worker self.terminate_broken(cause) 343*cda5da8dSAndroid Build Coastguard Worker return 344*cda5da8dSAndroid Build Coastguard Worker if result_item is not None: 345*cda5da8dSAndroid Build Coastguard Worker self.process_result_item(result_item) 346*cda5da8dSAndroid Build Coastguard Worker 347*cda5da8dSAndroid Build Coastguard Worker process_exited = result_item.exit_pid is not None 348*cda5da8dSAndroid Build Coastguard Worker if process_exited: 349*cda5da8dSAndroid Build Coastguard Worker p = self.processes.pop(result_item.exit_pid) 350*cda5da8dSAndroid Build Coastguard Worker p.join() 351*cda5da8dSAndroid Build Coastguard Worker 352*cda5da8dSAndroid Build Coastguard Worker # Delete reference to result_item to avoid keeping references 353*cda5da8dSAndroid Build Coastguard Worker # while waiting on new results. 354*cda5da8dSAndroid Build Coastguard Worker del result_item 355*cda5da8dSAndroid Build Coastguard Worker 356*cda5da8dSAndroid Build Coastguard Worker if executor := self.executor_reference(): 357*cda5da8dSAndroid Build Coastguard Worker if process_exited: 358*cda5da8dSAndroid Build Coastguard Worker with self.shutdown_lock: 359*cda5da8dSAndroid Build Coastguard Worker executor._adjust_process_count() 360*cda5da8dSAndroid Build Coastguard Worker else: 361*cda5da8dSAndroid Build Coastguard Worker executor._idle_worker_semaphore.release() 362*cda5da8dSAndroid Build Coastguard Worker del executor 363*cda5da8dSAndroid Build Coastguard Worker 364*cda5da8dSAndroid Build Coastguard Worker if self.is_shutting_down(): 365*cda5da8dSAndroid Build Coastguard Worker self.flag_executor_shutting_down() 366*cda5da8dSAndroid Build Coastguard Worker 367*cda5da8dSAndroid Build Coastguard Worker # When only canceled futures remain in pending_work_items, our 368*cda5da8dSAndroid Build Coastguard Worker # next call to wait_result_broken_or_wakeup would hang forever. 369*cda5da8dSAndroid Build Coastguard Worker # This makes sure we have some running futures or none at all. 370*cda5da8dSAndroid Build Coastguard Worker self.add_call_item_to_queue() 371*cda5da8dSAndroid Build Coastguard Worker 372*cda5da8dSAndroid Build Coastguard Worker # Since no new work items can be added, it is safe to shutdown 373*cda5da8dSAndroid Build Coastguard Worker # this thread if there are no pending work items. 374*cda5da8dSAndroid Build Coastguard Worker if not self.pending_work_items: 375*cda5da8dSAndroid Build Coastguard Worker self.join_executor_internals() 376*cda5da8dSAndroid Build Coastguard Worker return 377*cda5da8dSAndroid Build Coastguard Worker 378*cda5da8dSAndroid Build Coastguard Worker def add_call_item_to_queue(self): 379*cda5da8dSAndroid Build Coastguard Worker # Fills call_queue with _WorkItems from pending_work_items. 380*cda5da8dSAndroid Build Coastguard Worker # This function never blocks. 381*cda5da8dSAndroid Build Coastguard Worker while True: 382*cda5da8dSAndroid Build Coastguard Worker if self.call_queue.full(): 383*cda5da8dSAndroid Build Coastguard Worker return 384*cda5da8dSAndroid Build Coastguard Worker try: 385*cda5da8dSAndroid Build Coastguard Worker work_id = self.work_ids_queue.get(block=False) 386*cda5da8dSAndroid Build Coastguard Worker except queue.Empty: 387*cda5da8dSAndroid Build Coastguard Worker return 388*cda5da8dSAndroid Build Coastguard Worker else: 389*cda5da8dSAndroid Build Coastguard Worker work_item = self.pending_work_items[work_id] 390*cda5da8dSAndroid Build Coastguard Worker 391*cda5da8dSAndroid Build Coastguard Worker if work_item.future.set_running_or_notify_cancel(): 392*cda5da8dSAndroid Build Coastguard Worker self.call_queue.put(_CallItem(work_id, 393*cda5da8dSAndroid Build Coastguard Worker work_item.fn, 394*cda5da8dSAndroid Build Coastguard Worker work_item.args, 395*cda5da8dSAndroid Build Coastguard Worker work_item.kwargs), 396*cda5da8dSAndroid Build Coastguard Worker block=True) 397*cda5da8dSAndroid Build Coastguard Worker else: 398*cda5da8dSAndroid Build Coastguard Worker del self.pending_work_items[work_id] 399*cda5da8dSAndroid Build Coastguard Worker continue 400*cda5da8dSAndroid Build Coastguard Worker 401*cda5da8dSAndroid Build Coastguard Worker def wait_result_broken_or_wakeup(self): 402*cda5da8dSAndroid Build Coastguard Worker # Wait for a result to be ready in the result_queue while checking 403*cda5da8dSAndroid Build Coastguard Worker # that all worker processes are still running, or for a wake up 404*cda5da8dSAndroid Build Coastguard Worker # signal send. The wake up signals come either from new tasks being 405*cda5da8dSAndroid Build Coastguard Worker # submitted, from the executor being shutdown/gc-ed, or from the 406*cda5da8dSAndroid Build Coastguard Worker # shutdown of the python interpreter. 407*cda5da8dSAndroid Build Coastguard Worker result_reader = self.result_queue._reader 408*cda5da8dSAndroid Build Coastguard Worker assert not self.thread_wakeup._closed 409*cda5da8dSAndroid Build Coastguard Worker wakeup_reader = self.thread_wakeup._reader 410*cda5da8dSAndroid Build Coastguard Worker readers = [result_reader, wakeup_reader] 411*cda5da8dSAndroid Build Coastguard Worker worker_sentinels = [p.sentinel for p in list(self.processes.values())] 412*cda5da8dSAndroid Build Coastguard Worker ready = mp.connection.wait(readers + worker_sentinels) 413*cda5da8dSAndroid Build Coastguard Worker 414*cda5da8dSAndroid Build Coastguard Worker cause = None 415*cda5da8dSAndroid Build Coastguard Worker is_broken = True 416*cda5da8dSAndroid Build Coastguard Worker result_item = None 417*cda5da8dSAndroid Build Coastguard Worker if result_reader in ready: 418*cda5da8dSAndroid Build Coastguard Worker try: 419*cda5da8dSAndroid Build Coastguard Worker result_item = result_reader.recv() 420*cda5da8dSAndroid Build Coastguard Worker is_broken = False 421*cda5da8dSAndroid Build Coastguard Worker except BaseException as e: 422*cda5da8dSAndroid Build Coastguard Worker cause = format_exception(type(e), e, e.__traceback__) 423*cda5da8dSAndroid Build Coastguard Worker 424*cda5da8dSAndroid Build Coastguard Worker elif wakeup_reader in ready: 425*cda5da8dSAndroid Build Coastguard Worker is_broken = False 426*cda5da8dSAndroid Build Coastguard Worker 427*cda5da8dSAndroid Build Coastguard Worker with self.shutdown_lock: 428*cda5da8dSAndroid Build Coastguard Worker self.thread_wakeup.clear() 429*cda5da8dSAndroid Build Coastguard Worker 430*cda5da8dSAndroid Build Coastguard Worker return result_item, is_broken, cause 431*cda5da8dSAndroid Build Coastguard Worker 432*cda5da8dSAndroid Build Coastguard Worker def process_result_item(self, result_item): 433*cda5da8dSAndroid Build Coastguard Worker # Process the received a result_item. This can be either the PID of a 434*cda5da8dSAndroid Build Coastguard Worker # worker that exited gracefully or a _ResultItem 435*cda5da8dSAndroid Build Coastguard Worker 436*cda5da8dSAndroid Build Coastguard Worker if isinstance(result_item, int): 437*cda5da8dSAndroid Build Coastguard Worker # Clean shutdown of a worker using its PID 438*cda5da8dSAndroid Build Coastguard Worker # (avoids marking the executor broken) 439*cda5da8dSAndroid Build Coastguard Worker assert self.is_shutting_down() 440*cda5da8dSAndroid Build Coastguard Worker p = self.processes.pop(result_item) 441*cda5da8dSAndroid Build Coastguard Worker p.join() 442*cda5da8dSAndroid Build Coastguard Worker if not self.processes: 443*cda5da8dSAndroid Build Coastguard Worker self.join_executor_internals() 444*cda5da8dSAndroid Build Coastguard Worker return 445*cda5da8dSAndroid Build Coastguard Worker else: 446*cda5da8dSAndroid Build Coastguard Worker # Received a _ResultItem so mark the future as completed. 447*cda5da8dSAndroid Build Coastguard Worker work_item = self.pending_work_items.pop(result_item.work_id, None) 448*cda5da8dSAndroid Build Coastguard Worker # work_item can be None if another process terminated (see above) 449*cda5da8dSAndroid Build Coastguard Worker if work_item is not None: 450*cda5da8dSAndroid Build Coastguard Worker if result_item.exception: 451*cda5da8dSAndroid Build Coastguard Worker work_item.future.set_exception(result_item.exception) 452*cda5da8dSAndroid Build Coastguard Worker else: 453*cda5da8dSAndroid Build Coastguard Worker work_item.future.set_result(result_item.result) 454*cda5da8dSAndroid Build Coastguard Worker 455*cda5da8dSAndroid Build Coastguard Worker def is_shutting_down(self): 456*cda5da8dSAndroid Build Coastguard Worker # Check whether we should start shutting down the executor. 457*cda5da8dSAndroid Build Coastguard Worker executor = self.executor_reference() 458*cda5da8dSAndroid Build Coastguard Worker # No more work items can be added if: 459*cda5da8dSAndroid Build Coastguard Worker # - The interpreter is shutting down OR 460*cda5da8dSAndroid Build Coastguard Worker # - The executor that owns this worker has been collected OR 461*cda5da8dSAndroid Build Coastguard Worker # - The executor that owns this worker has been shutdown. 462*cda5da8dSAndroid Build Coastguard Worker return (_global_shutdown or executor is None 463*cda5da8dSAndroid Build Coastguard Worker or executor._shutdown_thread) 464*cda5da8dSAndroid Build Coastguard Worker 465*cda5da8dSAndroid Build Coastguard Worker def terminate_broken(self, cause): 466*cda5da8dSAndroid Build Coastguard Worker # Terminate the executor because it is in a broken state. The cause 467*cda5da8dSAndroid Build Coastguard Worker # argument can be used to display more information on the error that 468*cda5da8dSAndroid Build Coastguard Worker # lead the executor into becoming broken. 469*cda5da8dSAndroid Build Coastguard Worker 470*cda5da8dSAndroid Build Coastguard Worker # Mark the process pool broken so that submits fail right now. 471*cda5da8dSAndroid Build Coastguard Worker executor = self.executor_reference() 472*cda5da8dSAndroid Build Coastguard Worker if executor is not None: 473*cda5da8dSAndroid Build Coastguard Worker executor._broken = ('A child process terminated ' 474*cda5da8dSAndroid Build Coastguard Worker 'abruptly, the process pool is not ' 475*cda5da8dSAndroid Build Coastguard Worker 'usable anymore') 476*cda5da8dSAndroid Build Coastguard Worker executor._shutdown_thread = True 477*cda5da8dSAndroid Build Coastguard Worker executor = None 478*cda5da8dSAndroid Build Coastguard Worker 479*cda5da8dSAndroid Build Coastguard Worker # All pending tasks are to be marked failed with the following 480*cda5da8dSAndroid Build Coastguard Worker # BrokenProcessPool error 481*cda5da8dSAndroid Build Coastguard Worker bpe = BrokenProcessPool("A process in the process pool was " 482*cda5da8dSAndroid Build Coastguard Worker "terminated abruptly while the future was " 483*cda5da8dSAndroid Build Coastguard Worker "running or pending.") 484*cda5da8dSAndroid Build Coastguard Worker if cause is not None: 485*cda5da8dSAndroid Build Coastguard Worker bpe.__cause__ = _RemoteTraceback( 486*cda5da8dSAndroid Build Coastguard Worker f"\n'''\n{''.join(cause)}'''") 487*cda5da8dSAndroid Build Coastguard Worker 488*cda5da8dSAndroid Build Coastguard Worker # Mark pending tasks as failed. 489*cda5da8dSAndroid Build Coastguard Worker for work_id, work_item in self.pending_work_items.items(): 490*cda5da8dSAndroid Build Coastguard Worker work_item.future.set_exception(bpe) 491*cda5da8dSAndroid Build Coastguard Worker # Delete references to object. See issue16284 492*cda5da8dSAndroid Build Coastguard Worker del work_item 493*cda5da8dSAndroid Build Coastguard Worker self.pending_work_items.clear() 494*cda5da8dSAndroid Build Coastguard Worker 495*cda5da8dSAndroid Build Coastguard Worker # Terminate remaining workers forcibly: the queues or their 496*cda5da8dSAndroid Build Coastguard Worker # locks may be in a dirty state and block forever. 497*cda5da8dSAndroid Build Coastguard Worker for p in self.processes.values(): 498*cda5da8dSAndroid Build Coastguard Worker p.terminate() 499*cda5da8dSAndroid Build Coastguard Worker 500*cda5da8dSAndroid Build Coastguard Worker # clean up resources 501*cda5da8dSAndroid Build Coastguard Worker self.join_executor_internals() 502*cda5da8dSAndroid Build Coastguard Worker 503*cda5da8dSAndroid Build Coastguard Worker def flag_executor_shutting_down(self): 504*cda5da8dSAndroid Build Coastguard Worker # Flag the executor as shutting down and cancel remaining tasks if 505*cda5da8dSAndroid Build Coastguard Worker # requested as early as possible if it is not gc-ed yet. 506*cda5da8dSAndroid Build Coastguard Worker executor = self.executor_reference() 507*cda5da8dSAndroid Build Coastguard Worker if executor is not None: 508*cda5da8dSAndroid Build Coastguard Worker executor._shutdown_thread = True 509*cda5da8dSAndroid Build Coastguard Worker # Cancel pending work items if requested. 510*cda5da8dSAndroid Build Coastguard Worker if executor._cancel_pending_futures: 511*cda5da8dSAndroid Build Coastguard Worker # Cancel all pending futures and update pending_work_items 512*cda5da8dSAndroid Build Coastguard Worker # to only have futures that are currently running. 513*cda5da8dSAndroid Build Coastguard Worker new_pending_work_items = {} 514*cda5da8dSAndroid Build Coastguard Worker for work_id, work_item in self.pending_work_items.items(): 515*cda5da8dSAndroid Build Coastguard Worker if not work_item.future.cancel(): 516*cda5da8dSAndroid Build Coastguard Worker new_pending_work_items[work_id] = work_item 517*cda5da8dSAndroid Build Coastguard Worker self.pending_work_items = new_pending_work_items 518*cda5da8dSAndroid Build Coastguard Worker # Drain work_ids_queue since we no longer need to 519*cda5da8dSAndroid Build Coastguard Worker # add items to the call queue. 520*cda5da8dSAndroid Build Coastguard Worker while True: 521*cda5da8dSAndroid Build Coastguard Worker try: 522*cda5da8dSAndroid Build Coastguard Worker self.work_ids_queue.get_nowait() 523*cda5da8dSAndroid Build Coastguard Worker except queue.Empty: 524*cda5da8dSAndroid Build Coastguard Worker break 525*cda5da8dSAndroid Build Coastguard Worker # Make sure we do this only once to not waste time looping 526*cda5da8dSAndroid Build Coastguard Worker # on running processes over and over. 527*cda5da8dSAndroid Build Coastguard Worker executor._cancel_pending_futures = False 528*cda5da8dSAndroid Build Coastguard Worker 529*cda5da8dSAndroid Build Coastguard Worker def shutdown_workers(self): 530*cda5da8dSAndroid Build Coastguard Worker n_children_to_stop = self.get_n_children_alive() 531*cda5da8dSAndroid Build Coastguard Worker n_sentinels_sent = 0 532*cda5da8dSAndroid Build Coastguard Worker # Send the right number of sentinels, to make sure all children are 533*cda5da8dSAndroid Build Coastguard Worker # properly terminated. 534*cda5da8dSAndroid Build Coastguard Worker while (n_sentinels_sent < n_children_to_stop 535*cda5da8dSAndroid Build Coastguard Worker and self.get_n_children_alive() > 0): 536*cda5da8dSAndroid Build Coastguard Worker for i in range(n_children_to_stop - n_sentinels_sent): 537*cda5da8dSAndroid Build Coastguard Worker try: 538*cda5da8dSAndroid Build Coastguard Worker self.call_queue.put_nowait(None) 539*cda5da8dSAndroid Build Coastguard Worker n_sentinels_sent += 1 540*cda5da8dSAndroid Build Coastguard Worker except queue.Full: 541*cda5da8dSAndroid Build Coastguard Worker break 542*cda5da8dSAndroid Build Coastguard Worker 543*cda5da8dSAndroid Build Coastguard Worker def join_executor_internals(self): 544*cda5da8dSAndroid Build Coastguard Worker self.shutdown_workers() 545*cda5da8dSAndroid Build Coastguard Worker # Release the queue's resources as soon as possible. 546*cda5da8dSAndroid Build Coastguard Worker self.call_queue.close() 547*cda5da8dSAndroid Build Coastguard Worker self.call_queue.join_thread() 548*cda5da8dSAndroid Build Coastguard Worker with self.shutdown_lock: 549*cda5da8dSAndroid Build Coastguard Worker self.thread_wakeup.close() 550*cda5da8dSAndroid Build Coastguard Worker # If .join() is not called on the created processes then 551*cda5da8dSAndroid Build Coastguard Worker # some ctx.Queue methods may deadlock on Mac OS X. 552*cda5da8dSAndroid Build Coastguard Worker for p in self.processes.values(): 553*cda5da8dSAndroid Build Coastguard Worker p.join() 554*cda5da8dSAndroid Build Coastguard Worker 555*cda5da8dSAndroid Build Coastguard Worker def get_n_children_alive(self): 556*cda5da8dSAndroid Build Coastguard Worker # This is an upper bound on the number of children alive. 557*cda5da8dSAndroid Build Coastguard Worker return sum(p.is_alive() for p in self.processes.values()) 558*cda5da8dSAndroid Build Coastguard Worker 559*cda5da8dSAndroid Build Coastguard Worker 560*cda5da8dSAndroid Build Coastguard Worker_system_limits_checked = False 561*cda5da8dSAndroid Build Coastguard Worker_system_limited = None 562*cda5da8dSAndroid Build Coastguard Worker 563*cda5da8dSAndroid Build Coastguard Worker 564*cda5da8dSAndroid Build Coastguard Workerdef _check_system_limits(): 565*cda5da8dSAndroid Build Coastguard Worker global _system_limits_checked, _system_limited 566*cda5da8dSAndroid Build Coastguard Worker if _system_limits_checked: 567*cda5da8dSAndroid Build Coastguard Worker if _system_limited: 568*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError(_system_limited) 569*cda5da8dSAndroid Build Coastguard Worker _system_limits_checked = True 570*cda5da8dSAndroid Build Coastguard Worker try: 571*cda5da8dSAndroid Build Coastguard Worker import multiprocessing.synchronize 572*cda5da8dSAndroid Build Coastguard Worker except ImportError: 573*cda5da8dSAndroid Build Coastguard Worker _system_limited = ( 574*cda5da8dSAndroid Build Coastguard Worker "This Python build lacks multiprocessing.synchronize, usually due " 575*cda5da8dSAndroid Build Coastguard Worker "to named semaphores being unavailable on this platform." 576*cda5da8dSAndroid Build Coastguard Worker ) 577*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError(_system_limited) 578*cda5da8dSAndroid Build Coastguard Worker try: 579*cda5da8dSAndroid Build Coastguard Worker nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") 580*cda5da8dSAndroid Build Coastguard Worker except (AttributeError, ValueError): 581*cda5da8dSAndroid Build Coastguard Worker # sysconf not available or setting not available 582*cda5da8dSAndroid Build Coastguard Worker return 583*cda5da8dSAndroid Build Coastguard Worker if nsems_max == -1: 584*cda5da8dSAndroid Build Coastguard Worker # indetermined limit, assume that limit is determined 585*cda5da8dSAndroid Build Coastguard Worker # by available memory only 586*cda5da8dSAndroid Build Coastguard Worker return 587*cda5da8dSAndroid Build Coastguard Worker if nsems_max >= 256: 588*cda5da8dSAndroid Build Coastguard Worker # minimum number of semaphores available 589*cda5da8dSAndroid Build Coastguard Worker # according to POSIX 590*cda5da8dSAndroid Build Coastguard Worker return 591*cda5da8dSAndroid Build Coastguard Worker _system_limited = ("system provides too few semaphores (%d" 592*cda5da8dSAndroid Build Coastguard Worker " available, 256 necessary)" % nsems_max) 593*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError(_system_limited) 594*cda5da8dSAndroid Build Coastguard Worker 595*cda5da8dSAndroid Build Coastguard Worker 596*cda5da8dSAndroid Build Coastguard Workerdef _chain_from_iterable_of_lists(iterable): 597*cda5da8dSAndroid Build Coastguard Worker """ 598*cda5da8dSAndroid Build Coastguard Worker Specialized implementation of itertools.chain.from_iterable. 599*cda5da8dSAndroid Build Coastguard Worker Each item in *iterable* should be a list. This function is 600*cda5da8dSAndroid Build Coastguard Worker careful not to keep references to yielded objects. 601*cda5da8dSAndroid Build Coastguard Worker """ 602*cda5da8dSAndroid Build Coastguard Worker for element in iterable: 603*cda5da8dSAndroid Build Coastguard Worker element.reverse() 604*cda5da8dSAndroid Build Coastguard Worker while element: 605*cda5da8dSAndroid Build Coastguard Worker yield element.pop() 606*cda5da8dSAndroid Build Coastguard Worker 607*cda5da8dSAndroid Build Coastguard Worker 608*cda5da8dSAndroid Build Coastguard Workerclass BrokenProcessPool(_base.BrokenExecutor): 609*cda5da8dSAndroid Build Coastguard Worker """ 610*cda5da8dSAndroid Build Coastguard Worker Raised when a process in a ProcessPoolExecutor terminated abruptly 611*cda5da8dSAndroid Build Coastguard Worker while a future was in the running state. 612*cda5da8dSAndroid Build Coastguard Worker """ 613*cda5da8dSAndroid Build Coastguard Worker 614*cda5da8dSAndroid Build Coastguard Worker 615*cda5da8dSAndroid Build Coastguard Workerclass ProcessPoolExecutor(_base.Executor): 616*cda5da8dSAndroid Build Coastguard Worker def __init__(self, max_workers=None, mp_context=None, 617*cda5da8dSAndroid Build Coastguard Worker initializer=None, initargs=(), *, max_tasks_per_child=None): 618*cda5da8dSAndroid Build Coastguard Worker """Initializes a new ProcessPoolExecutor instance. 619*cda5da8dSAndroid Build Coastguard Worker 620*cda5da8dSAndroid Build Coastguard Worker Args: 621*cda5da8dSAndroid Build Coastguard Worker max_workers: The maximum number of processes that can be used to 622*cda5da8dSAndroid Build Coastguard Worker execute the given calls. If None or not given then as many 623*cda5da8dSAndroid Build Coastguard Worker worker processes will be created as the machine has processors. 624*cda5da8dSAndroid Build Coastguard Worker mp_context: A multiprocessing context to launch the workers. This 625*cda5da8dSAndroid Build Coastguard Worker object should provide SimpleQueue, Queue and Process. Useful 626*cda5da8dSAndroid Build Coastguard Worker to allow specific multiprocessing start methods. 627*cda5da8dSAndroid Build Coastguard Worker initializer: A callable used to initialize worker processes. 628*cda5da8dSAndroid Build Coastguard Worker initargs: A tuple of arguments to pass to the initializer. 629*cda5da8dSAndroid Build Coastguard Worker max_tasks_per_child: The maximum number of tasks a worker process 630*cda5da8dSAndroid Build Coastguard Worker can complete before it will exit and be replaced with a fresh 631*cda5da8dSAndroid Build Coastguard Worker worker process. The default of None means worker process will 632*cda5da8dSAndroid Build Coastguard Worker live as long as the executor. Requires a non-'fork' mp_context 633*cda5da8dSAndroid Build Coastguard Worker start method. When given, we default to using 'spawn' if no 634*cda5da8dSAndroid Build Coastguard Worker mp_context is supplied. 635*cda5da8dSAndroid Build Coastguard Worker """ 636*cda5da8dSAndroid Build Coastguard Worker _check_system_limits() 637*cda5da8dSAndroid Build Coastguard Worker 638*cda5da8dSAndroid Build Coastguard Worker if max_workers is None: 639*cda5da8dSAndroid Build Coastguard Worker self._max_workers = os.cpu_count() or 1 640*cda5da8dSAndroid Build Coastguard Worker if sys.platform == 'win32': 641*cda5da8dSAndroid Build Coastguard Worker self._max_workers = min(_MAX_WINDOWS_WORKERS, 642*cda5da8dSAndroid Build Coastguard Worker self._max_workers) 643*cda5da8dSAndroid Build Coastguard Worker else: 644*cda5da8dSAndroid Build Coastguard Worker if max_workers <= 0: 645*cda5da8dSAndroid Build Coastguard Worker raise ValueError("max_workers must be greater than 0") 646*cda5da8dSAndroid Build Coastguard Worker elif (sys.platform == 'win32' and 647*cda5da8dSAndroid Build Coastguard Worker max_workers > _MAX_WINDOWS_WORKERS): 648*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 649*cda5da8dSAndroid Build Coastguard Worker f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") 650*cda5da8dSAndroid Build Coastguard Worker 651*cda5da8dSAndroid Build Coastguard Worker self._max_workers = max_workers 652*cda5da8dSAndroid Build Coastguard Worker 653*cda5da8dSAndroid Build Coastguard Worker if mp_context is None: 654*cda5da8dSAndroid Build Coastguard Worker if max_tasks_per_child is not None: 655*cda5da8dSAndroid Build Coastguard Worker mp_context = mp.get_context("spawn") 656*cda5da8dSAndroid Build Coastguard Worker else: 657*cda5da8dSAndroid Build Coastguard Worker mp_context = mp.get_context() 658*cda5da8dSAndroid Build Coastguard Worker self._mp_context = mp_context 659*cda5da8dSAndroid Build Coastguard Worker 660*cda5da8dSAndroid Build Coastguard Worker # https://github.com/python/cpython/issues/90622 661*cda5da8dSAndroid Build Coastguard Worker self._safe_to_dynamically_spawn_children = ( 662*cda5da8dSAndroid Build Coastguard Worker self._mp_context.get_start_method(allow_none=False) != "fork") 663*cda5da8dSAndroid Build Coastguard Worker 664*cda5da8dSAndroid Build Coastguard Worker if initializer is not None and not callable(initializer): 665*cda5da8dSAndroid Build Coastguard Worker raise TypeError("initializer must be a callable") 666*cda5da8dSAndroid Build Coastguard Worker self._initializer = initializer 667*cda5da8dSAndroid Build Coastguard Worker self._initargs = initargs 668*cda5da8dSAndroid Build Coastguard Worker 669*cda5da8dSAndroid Build Coastguard Worker if max_tasks_per_child is not None: 670*cda5da8dSAndroid Build Coastguard Worker if not isinstance(max_tasks_per_child, int): 671*cda5da8dSAndroid Build Coastguard Worker raise TypeError("max_tasks_per_child must be an integer") 672*cda5da8dSAndroid Build Coastguard Worker elif max_tasks_per_child <= 0: 673*cda5da8dSAndroid Build Coastguard Worker raise ValueError("max_tasks_per_child must be >= 1") 674*cda5da8dSAndroid Build Coastguard Worker if self._mp_context.get_start_method(allow_none=False) == "fork": 675*cda5da8dSAndroid Build Coastguard Worker # https://github.com/python/cpython/issues/90622 676*cda5da8dSAndroid Build Coastguard Worker raise ValueError("max_tasks_per_child is incompatible with" 677*cda5da8dSAndroid Build Coastguard Worker " the 'fork' multiprocessing start method;" 678*cda5da8dSAndroid Build Coastguard Worker " supply a different mp_context.") 679*cda5da8dSAndroid Build Coastguard Worker self._max_tasks_per_child = max_tasks_per_child 680*cda5da8dSAndroid Build Coastguard Worker 681*cda5da8dSAndroid Build Coastguard Worker # Management thread 682*cda5da8dSAndroid Build Coastguard Worker self._executor_manager_thread = None 683*cda5da8dSAndroid Build Coastguard Worker 684*cda5da8dSAndroid Build Coastguard Worker # Map of pids to processes 685*cda5da8dSAndroid Build Coastguard Worker self._processes = {} 686*cda5da8dSAndroid Build Coastguard Worker 687*cda5da8dSAndroid Build Coastguard Worker # Shutdown is a two-step process. 688*cda5da8dSAndroid Build Coastguard Worker self._shutdown_thread = False 689*cda5da8dSAndroid Build Coastguard Worker self._shutdown_lock = threading.Lock() 690*cda5da8dSAndroid Build Coastguard Worker self._idle_worker_semaphore = threading.Semaphore(0) 691*cda5da8dSAndroid Build Coastguard Worker self._broken = False 692*cda5da8dSAndroid Build Coastguard Worker self._queue_count = 0 693*cda5da8dSAndroid Build Coastguard Worker self._pending_work_items = {} 694*cda5da8dSAndroid Build Coastguard Worker self._cancel_pending_futures = False 695*cda5da8dSAndroid Build Coastguard Worker 696*cda5da8dSAndroid Build Coastguard Worker # _ThreadWakeup is a communication channel used to interrupt the wait 697*cda5da8dSAndroid Build Coastguard Worker # of the main loop of executor_manager_thread from another thread (e.g. 698*cda5da8dSAndroid Build Coastguard Worker # when calling executor.submit or executor.shutdown). We do not use the 699*cda5da8dSAndroid Build Coastguard Worker # _result_queue to send wakeup signals to the executor_manager_thread 700*cda5da8dSAndroid Build Coastguard Worker # as it could result in a deadlock if a worker process dies with the 701*cda5da8dSAndroid Build Coastguard Worker # _result_queue write lock still acquired. 702*cda5da8dSAndroid Build Coastguard Worker # 703*cda5da8dSAndroid Build Coastguard Worker # _shutdown_lock must be locked to access _ThreadWakeup. 704*cda5da8dSAndroid Build Coastguard Worker self._executor_manager_thread_wakeup = _ThreadWakeup() 705*cda5da8dSAndroid Build Coastguard Worker 706*cda5da8dSAndroid Build Coastguard Worker # Create communication channels for the executor 707*cda5da8dSAndroid Build Coastguard Worker # Make the call queue slightly larger than the number of processes to 708*cda5da8dSAndroid Build Coastguard Worker # prevent the worker processes from idling. But don't make it too big 709*cda5da8dSAndroid Build Coastguard Worker # because futures in the call queue cannot be cancelled. 710*cda5da8dSAndroid Build Coastguard Worker queue_size = self._max_workers + EXTRA_QUEUED_CALLS 711*cda5da8dSAndroid Build Coastguard Worker self._call_queue = _SafeQueue( 712*cda5da8dSAndroid Build Coastguard Worker max_size=queue_size, ctx=self._mp_context, 713*cda5da8dSAndroid Build Coastguard Worker pending_work_items=self._pending_work_items, 714*cda5da8dSAndroid Build Coastguard Worker shutdown_lock=self._shutdown_lock, 715*cda5da8dSAndroid Build Coastguard Worker thread_wakeup=self._executor_manager_thread_wakeup) 716*cda5da8dSAndroid Build Coastguard Worker # Killed worker processes can produce spurious "broken pipe" 717*cda5da8dSAndroid Build Coastguard Worker # tracebacks in the queue's own worker thread. But we detect killed 718*cda5da8dSAndroid Build Coastguard Worker # processes anyway, so silence the tracebacks. 719*cda5da8dSAndroid Build Coastguard Worker self._call_queue._ignore_epipe = True 720*cda5da8dSAndroid Build Coastguard Worker self._result_queue = mp_context.SimpleQueue() 721*cda5da8dSAndroid Build Coastguard Worker self._work_ids = queue.Queue() 722*cda5da8dSAndroid Build Coastguard Worker 723*cda5da8dSAndroid Build Coastguard Worker def _start_executor_manager_thread(self): 724*cda5da8dSAndroid Build Coastguard Worker if self._executor_manager_thread is None: 725*cda5da8dSAndroid Build Coastguard Worker # Start the processes so that their sentinels are known. 726*cda5da8dSAndroid Build Coastguard Worker if not self._safe_to_dynamically_spawn_children: # ie, using fork. 727*cda5da8dSAndroid Build Coastguard Worker self._launch_processes() 728*cda5da8dSAndroid Build Coastguard Worker self._executor_manager_thread = _ExecutorManagerThread(self) 729*cda5da8dSAndroid Build Coastguard Worker self._executor_manager_thread.start() 730*cda5da8dSAndroid Build Coastguard Worker _threads_wakeups[self._executor_manager_thread] = \ 731*cda5da8dSAndroid Build Coastguard Worker self._executor_manager_thread_wakeup 732*cda5da8dSAndroid Build Coastguard Worker 733*cda5da8dSAndroid Build Coastguard Worker def _adjust_process_count(self): 734*cda5da8dSAndroid Build Coastguard Worker # if there's an idle process, we don't need to spawn a new one. 735*cda5da8dSAndroid Build Coastguard Worker if self._idle_worker_semaphore.acquire(blocking=False): 736*cda5da8dSAndroid Build Coastguard Worker return 737*cda5da8dSAndroid Build Coastguard Worker 738*cda5da8dSAndroid Build Coastguard Worker process_count = len(self._processes) 739*cda5da8dSAndroid Build Coastguard Worker if process_count < self._max_workers: 740*cda5da8dSAndroid Build Coastguard Worker # Assertion disabled as this codepath is also used to replace a 741*cda5da8dSAndroid Build Coastguard Worker # worker that unexpectedly dies, even when using the 'fork' start 742*cda5da8dSAndroid Build Coastguard Worker # method. That means there is still a potential deadlock bug. If a 743*cda5da8dSAndroid Build Coastguard Worker # 'fork' mp_context worker dies, we'll be forking a new one when 744*cda5da8dSAndroid Build Coastguard Worker # we know a thread is running (self._executor_manager_thread). 745*cda5da8dSAndroid Build Coastguard Worker #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622' 746*cda5da8dSAndroid Build Coastguard Worker self._spawn_process() 747*cda5da8dSAndroid Build Coastguard Worker 748*cda5da8dSAndroid Build Coastguard Worker def _launch_processes(self): 749*cda5da8dSAndroid Build Coastguard Worker # https://github.com/python/cpython/issues/90622 750*cda5da8dSAndroid Build Coastguard Worker assert not self._executor_manager_thread, ( 751*cda5da8dSAndroid Build Coastguard Worker 'Processes cannot be fork()ed after the thread has started, ' 752*cda5da8dSAndroid Build Coastguard Worker 'deadlock in the child processes could result.') 753*cda5da8dSAndroid Build Coastguard Worker for _ in range(len(self._processes), self._max_workers): 754*cda5da8dSAndroid Build Coastguard Worker self._spawn_process() 755*cda5da8dSAndroid Build Coastguard Worker 756*cda5da8dSAndroid Build Coastguard Worker def _spawn_process(self): 757*cda5da8dSAndroid Build Coastguard Worker p = self._mp_context.Process( 758*cda5da8dSAndroid Build Coastguard Worker target=_process_worker, 759*cda5da8dSAndroid Build Coastguard Worker args=(self._call_queue, 760*cda5da8dSAndroid Build Coastguard Worker self._result_queue, 761*cda5da8dSAndroid Build Coastguard Worker self._initializer, 762*cda5da8dSAndroid Build Coastguard Worker self._initargs, 763*cda5da8dSAndroid Build Coastguard Worker self._max_tasks_per_child)) 764*cda5da8dSAndroid Build Coastguard Worker p.start() 765*cda5da8dSAndroid Build Coastguard Worker self._processes[p.pid] = p 766*cda5da8dSAndroid Build Coastguard Worker 767*cda5da8dSAndroid Build Coastguard Worker def submit(self, fn, /, *args, **kwargs): 768*cda5da8dSAndroid Build Coastguard Worker with self._shutdown_lock: 769*cda5da8dSAndroid Build Coastguard Worker if self._broken: 770*cda5da8dSAndroid Build Coastguard Worker raise BrokenProcessPool(self._broken) 771*cda5da8dSAndroid Build Coastguard Worker if self._shutdown_thread: 772*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('cannot schedule new futures after shutdown') 773*cda5da8dSAndroid Build Coastguard Worker if _global_shutdown: 774*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('cannot schedule new futures after ' 775*cda5da8dSAndroid Build Coastguard Worker 'interpreter shutdown') 776*cda5da8dSAndroid Build Coastguard Worker 777*cda5da8dSAndroid Build Coastguard Worker f = _base.Future() 778*cda5da8dSAndroid Build Coastguard Worker w = _WorkItem(f, fn, args, kwargs) 779*cda5da8dSAndroid Build Coastguard Worker 780*cda5da8dSAndroid Build Coastguard Worker self._pending_work_items[self._queue_count] = w 781*cda5da8dSAndroid Build Coastguard Worker self._work_ids.put(self._queue_count) 782*cda5da8dSAndroid Build Coastguard Worker self._queue_count += 1 783*cda5da8dSAndroid Build Coastguard Worker # Wake up queue management thread 784*cda5da8dSAndroid Build Coastguard Worker self._executor_manager_thread_wakeup.wakeup() 785*cda5da8dSAndroid Build Coastguard Worker 786*cda5da8dSAndroid Build Coastguard Worker if self._safe_to_dynamically_spawn_children: 787*cda5da8dSAndroid Build Coastguard Worker self._adjust_process_count() 788*cda5da8dSAndroid Build Coastguard Worker self._start_executor_manager_thread() 789*cda5da8dSAndroid Build Coastguard Worker return f 790*cda5da8dSAndroid Build Coastguard Worker submit.__doc__ = _base.Executor.submit.__doc__ 791*cda5da8dSAndroid Build Coastguard Worker 792*cda5da8dSAndroid Build Coastguard Worker def map(self, fn, *iterables, timeout=None, chunksize=1): 793*cda5da8dSAndroid Build Coastguard Worker """Returns an iterator equivalent to map(fn, iter). 794*cda5da8dSAndroid Build Coastguard Worker 795*cda5da8dSAndroid Build Coastguard Worker Args: 796*cda5da8dSAndroid Build Coastguard Worker fn: A callable that will take as many arguments as there are 797*cda5da8dSAndroid Build Coastguard Worker passed iterables. 798*cda5da8dSAndroid Build Coastguard Worker timeout: The maximum number of seconds to wait. If None, then there 799*cda5da8dSAndroid Build Coastguard Worker is no limit on the wait time. 800*cda5da8dSAndroid Build Coastguard Worker chunksize: If greater than one, the iterables will be chopped into 801*cda5da8dSAndroid Build Coastguard Worker chunks of size chunksize and submitted to the process pool. 802*cda5da8dSAndroid Build Coastguard Worker If set to one, the items in the list will be sent one at a time. 803*cda5da8dSAndroid Build Coastguard Worker 804*cda5da8dSAndroid Build Coastguard Worker Returns: 805*cda5da8dSAndroid Build Coastguard Worker An iterator equivalent to: map(func, *iterables) but the calls may 806*cda5da8dSAndroid Build Coastguard Worker be evaluated out-of-order. 807*cda5da8dSAndroid Build Coastguard Worker 808*cda5da8dSAndroid Build Coastguard Worker Raises: 809*cda5da8dSAndroid Build Coastguard Worker TimeoutError: If the entire result iterator could not be generated 810*cda5da8dSAndroid Build Coastguard Worker before the given timeout. 811*cda5da8dSAndroid Build Coastguard Worker Exception: If fn(*args) raises for any values. 812*cda5da8dSAndroid Build Coastguard Worker """ 813*cda5da8dSAndroid Build Coastguard Worker if chunksize < 1: 814*cda5da8dSAndroid Build Coastguard Worker raise ValueError("chunksize must be >= 1.") 815*cda5da8dSAndroid Build Coastguard Worker 816*cda5da8dSAndroid Build Coastguard Worker results = super().map(partial(_process_chunk, fn), 817*cda5da8dSAndroid Build Coastguard Worker _get_chunks(*iterables, chunksize=chunksize), 818*cda5da8dSAndroid Build Coastguard Worker timeout=timeout) 819*cda5da8dSAndroid Build Coastguard Worker return _chain_from_iterable_of_lists(results) 820*cda5da8dSAndroid Build Coastguard Worker 821*cda5da8dSAndroid Build Coastguard Worker def shutdown(self, wait=True, *, cancel_futures=False): 822*cda5da8dSAndroid Build Coastguard Worker with self._shutdown_lock: 823*cda5da8dSAndroid Build Coastguard Worker self._cancel_pending_futures = cancel_futures 824*cda5da8dSAndroid Build Coastguard Worker self._shutdown_thread = True 825*cda5da8dSAndroid Build Coastguard Worker if self._executor_manager_thread_wakeup is not None: 826*cda5da8dSAndroid Build Coastguard Worker # Wake up queue management thread 827*cda5da8dSAndroid Build Coastguard Worker self._executor_manager_thread_wakeup.wakeup() 828*cda5da8dSAndroid Build Coastguard Worker 829*cda5da8dSAndroid Build Coastguard Worker if self._executor_manager_thread is not None and wait: 830*cda5da8dSAndroid Build Coastguard Worker self._executor_manager_thread.join() 831*cda5da8dSAndroid Build Coastguard Worker # To reduce the risk of opening too many files, remove references to 832*cda5da8dSAndroid Build Coastguard Worker # objects that use file descriptors. 833*cda5da8dSAndroid Build Coastguard Worker self._executor_manager_thread = None 834*cda5da8dSAndroid Build Coastguard Worker self._call_queue = None 835*cda5da8dSAndroid Build Coastguard Worker if self._result_queue is not None and wait: 836*cda5da8dSAndroid Build Coastguard Worker self._result_queue.close() 837*cda5da8dSAndroid Build Coastguard Worker self._result_queue = None 838*cda5da8dSAndroid Build Coastguard Worker self._processes = None 839*cda5da8dSAndroid Build Coastguard Worker self._executor_manager_thread_wakeup = None 840*cda5da8dSAndroid Build Coastguard Worker 841*cda5da8dSAndroid Build Coastguard Worker shutdown.__doc__ = _base.Executor.shutdown.__doc__ 842