1*cda5da8dSAndroid Build Coastguard Worker# Copyright 2009 Brian Quinlan. All Rights Reserved. 2*cda5da8dSAndroid Build Coastguard Worker# Licensed to PSF under a Contributor Agreement. 3*cda5da8dSAndroid Build Coastguard Worker 4*cda5da8dSAndroid Build Coastguard Worker"""Implements ThreadPoolExecutor.""" 5*cda5da8dSAndroid Build Coastguard Worker 6*cda5da8dSAndroid Build Coastguard Worker__author__ = 'Brian Quinlan ([email protected])' 7*cda5da8dSAndroid Build Coastguard Worker 8*cda5da8dSAndroid Build Coastguard Workerfrom concurrent.futures import _base 9*cda5da8dSAndroid Build Coastguard Workerimport itertools 10*cda5da8dSAndroid Build Coastguard Workerimport queue 11*cda5da8dSAndroid Build Coastguard Workerimport threading 12*cda5da8dSAndroid Build Coastguard Workerimport types 13*cda5da8dSAndroid Build Coastguard Workerimport weakref 14*cda5da8dSAndroid Build Coastguard Workerimport os 15*cda5da8dSAndroid Build Coastguard Worker 16*cda5da8dSAndroid Build Coastguard Worker 17*cda5da8dSAndroid Build Coastguard Worker_threads_queues = weakref.WeakKeyDictionary() 18*cda5da8dSAndroid Build Coastguard Worker_shutdown = False 19*cda5da8dSAndroid Build Coastguard Worker# Lock that ensures that new workers are not created while the interpreter is 20*cda5da8dSAndroid Build Coastguard Worker# shutting down. Must be held while mutating _threads_queues and _shutdown. 21*cda5da8dSAndroid Build Coastguard Worker_global_shutdown_lock = threading.Lock() 22*cda5da8dSAndroid Build Coastguard Worker 23*cda5da8dSAndroid Build Coastguard Workerdef _python_exit(): 24*cda5da8dSAndroid Build Coastguard Worker global _shutdown 25*cda5da8dSAndroid Build Coastguard Worker with _global_shutdown_lock: 26*cda5da8dSAndroid Build Coastguard Worker _shutdown = True 27*cda5da8dSAndroid Build Coastguard Worker items = list(_threads_queues.items()) 28*cda5da8dSAndroid Build Coastguard Worker for t, q in items: 29*cda5da8dSAndroid Build Coastguard Worker q.put(None) 30*cda5da8dSAndroid Build Coastguard Worker for t, q in items: 31*cda5da8dSAndroid Build Coastguard Worker t.join() 32*cda5da8dSAndroid Build Coastguard Worker 33*cda5da8dSAndroid Build Coastguard Worker# Register for `_python_exit()` to be called just before joining all 34*cda5da8dSAndroid Build Coastguard Worker# non-daemon threads. This is used instead of `atexit.register()` for 35*cda5da8dSAndroid Build Coastguard Worker# compatibility with subinterpreters, which no longer support daemon threads. 36*cda5da8dSAndroid Build Coastguard Worker# See bpo-39812 for context. 37*cda5da8dSAndroid Build Coastguard Workerthreading._register_atexit(_python_exit) 38*cda5da8dSAndroid Build Coastguard Worker 39*cda5da8dSAndroid Build Coastguard Worker# At fork, reinitialize the `_global_shutdown_lock` lock in the child process 40*cda5da8dSAndroid Build Coastguard Workerif hasattr(os, 'register_at_fork'): 41*cda5da8dSAndroid Build Coastguard Worker os.register_at_fork(before=_global_shutdown_lock.acquire, 42*cda5da8dSAndroid Build Coastguard Worker after_in_child=_global_shutdown_lock._at_fork_reinit, 43*cda5da8dSAndroid Build Coastguard Worker after_in_parent=_global_shutdown_lock.release) 44*cda5da8dSAndroid Build Coastguard Worker 45*cda5da8dSAndroid Build Coastguard Worker 46*cda5da8dSAndroid Build Coastguard Workerclass _WorkItem(object): 47*cda5da8dSAndroid Build Coastguard Worker def __init__(self, future, fn, args, kwargs): 48*cda5da8dSAndroid Build Coastguard Worker self.future = future 49*cda5da8dSAndroid Build Coastguard Worker self.fn = fn 50*cda5da8dSAndroid Build Coastguard Worker self.args = args 51*cda5da8dSAndroid Build Coastguard Worker self.kwargs = kwargs 52*cda5da8dSAndroid Build Coastguard Worker 53*cda5da8dSAndroid Build Coastguard Worker def run(self): 54*cda5da8dSAndroid Build Coastguard Worker if not self.future.set_running_or_notify_cancel(): 55*cda5da8dSAndroid Build Coastguard Worker return 56*cda5da8dSAndroid Build Coastguard Worker 57*cda5da8dSAndroid Build Coastguard Worker try: 58*cda5da8dSAndroid Build Coastguard Worker result = self.fn(*self.args, **self.kwargs) 59*cda5da8dSAndroid Build Coastguard Worker except BaseException as exc: 60*cda5da8dSAndroid Build Coastguard Worker self.future.set_exception(exc) 61*cda5da8dSAndroid Build Coastguard Worker # Break a reference cycle with the exception 'exc' 62*cda5da8dSAndroid Build Coastguard Worker self = None 63*cda5da8dSAndroid Build Coastguard Worker else: 64*cda5da8dSAndroid Build Coastguard Worker self.future.set_result(result) 65*cda5da8dSAndroid Build Coastguard Worker 66*cda5da8dSAndroid Build Coastguard Worker __class_getitem__ = classmethod(types.GenericAlias) 67*cda5da8dSAndroid Build Coastguard Worker 68*cda5da8dSAndroid Build Coastguard Worker 69*cda5da8dSAndroid Build Coastguard Workerdef _worker(executor_reference, work_queue, initializer, initargs): 70*cda5da8dSAndroid Build Coastguard Worker if initializer is not None: 71*cda5da8dSAndroid Build Coastguard Worker try: 72*cda5da8dSAndroid Build Coastguard Worker initializer(*initargs) 73*cda5da8dSAndroid Build Coastguard Worker except BaseException: 74*cda5da8dSAndroid Build Coastguard Worker _base.LOGGER.critical('Exception in initializer:', exc_info=True) 75*cda5da8dSAndroid Build Coastguard Worker executor = executor_reference() 76*cda5da8dSAndroid Build Coastguard Worker if executor is not None: 77*cda5da8dSAndroid Build Coastguard Worker executor._initializer_failed() 78*cda5da8dSAndroid Build Coastguard Worker return 79*cda5da8dSAndroid Build Coastguard Worker try: 80*cda5da8dSAndroid Build Coastguard Worker while True: 81*cda5da8dSAndroid Build Coastguard Worker work_item = work_queue.get(block=True) 82*cda5da8dSAndroid Build Coastguard Worker if work_item is not None: 83*cda5da8dSAndroid Build Coastguard Worker work_item.run() 84*cda5da8dSAndroid Build Coastguard Worker # Delete references to object. See issue16284 85*cda5da8dSAndroid Build Coastguard Worker del work_item 86*cda5da8dSAndroid Build Coastguard Worker 87*cda5da8dSAndroid Build Coastguard Worker # attempt to increment idle count 88*cda5da8dSAndroid Build Coastguard Worker executor = executor_reference() 89*cda5da8dSAndroid Build Coastguard Worker if executor is not None: 90*cda5da8dSAndroid Build Coastguard Worker executor._idle_semaphore.release() 91*cda5da8dSAndroid Build Coastguard Worker del executor 92*cda5da8dSAndroid Build Coastguard Worker continue 93*cda5da8dSAndroid Build Coastguard Worker 94*cda5da8dSAndroid Build Coastguard Worker executor = executor_reference() 95*cda5da8dSAndroid Build Coastguard Worker # Exit if: 96*cda5da8dSAndroid Build Coastguard Worker # - The interpreter is shutting down OR 97*cda5da8dSAndroid Build Coastguard Worker # - The executor that owns the worker has been collected OR 98*cda5da8dSAndroid Build Coastguard Worker # - The executor that owns the worker has been shutdown. 99*cda5da8dSAndroid Build Coastguard Worker if _shutdown or executor is None or executor._shutdown: 100*cda5da8dSAndroid Build Coastguard Worker # Flag the executor as shutting down as early as possible if it 101*cda5da8dSAndroid Build Coastguard Worker # is not gc-ed yet. 102*cda5da8dSAndroid Build Coastguard Worker if executor is not None: 103*cda5da8dSAndroid Build Coastguard Worker executor._shutdown = True 104*cda5da8dSAndroid Build Coastguard Worker # Notice other workers 105*cda5da8dSAndroid Build Coastguard Worker work_queue.put(None) 106*cda5da8dSAndroid Build Coastguard Worker return 107*cda5da8dSAndroid Build Coastguard Worker del executor 108*cda5da8dSAndroid Build Coastguard Worker except BaseException: 109*cda5da8dSAndroid Build Coastguard Worker _base.LOGGER.critical('Exception in worker', exc_info=True) 110*cda5da8dSAndroid Build Coastguard Worker 111*cda5da8dSAndroid Build Coastguard Worker 112*cda5da8dSAndroid Build Coastguard Workerclass BrokenThreadPool(_base.BrokenExecutor): 113*cda5da8dSAndroid Build Coastguard Worker """ 114*cda5da8dSAndroid Build Coastguard Worker Raised when a worker thread in a ThreadPoolExecutor failed initializing. 115*cda5da8dSAndroid Build Coastguard Worker """ 116*cda5da8dSAndroid Build Coastguard Worker 117*cda5da8dSAndroid Build Coastguard Worker 118*cda5da8dSAndroid Build Coastguard Workerclass ThreadPoolExecutor(_base.Executor): 119*cda5da8dSAndroid Build Coastguard Worker 120*cda5da8dSAndroid Build Coastguard Worker # Used to assign unique thread names when thread_name_prefix is not supplied. 121*cda5da8dSAndroid Build Coastguard Worker _counter = itertools.count().__next__ 122*cda5da8dSAndroid Build Coastguard Worker 123*cda5da8dSAndroid Build Coastguard Worker def __init__(self, max_workers=None, thread_name_prefix='', 124*cda5da8dSAndroid Build Coastguard Worker initializer=None, initargs=()): 125*cda5da8dSAndroid Build Coastguard Worker """Initializes a new ThreadPoolExecutor instance. 126*cda5da8dSAndroid Build Coastguard Worker 127*cda5da8dSAndroid Build Coastguard Worker Args: 128*cda5da8dSAndroid Build Coastguard Worker max_workers: The maximum number of threads that can be used to 129*cda5da8dSAndroid Build Coastguard Worker execute the given calls. 130*cda5da8dSAndroid Build Coastguard Worker thread_name_prefix: An optional name prefix to give our threads. 131*cda5da8dSAndroid Build Coastguard Worker initializer: A callable used to initialize worker threads. 132*cda5da8dSAndroid Build Coastguard Worker initargs: A tuple of arguments to pass to the initializer. 133*cda5da8dSAndroid Build Coastguard Worker """ 134*cda5da8dSAndroid Build Coastguard Worker if max_workers is None: 135*cda5da8dSAndroid Build Coastguard Worker # ThreadPoolExecutor is often used to: 136*cda5da8dSAndroid Build Coastguard Worker # * CPU bound task which releases GIL 137*cda5da8dSAndroid Build Coastguard Worker # * I/O bound task (which releases GIL, of course) 138*cda5da8dSAndroid Build Coastguard Worker # 139*cda5da8dSAndroid Build Coastguard Worker # We use cpu_count + 4 for both types of tasks. 140*cda5da8dSAndroid Build Coastguard Worker # But we limit it to 32 to avoid consuming surprisingly large resource 141*cda5da8dSAndroid Build Coastguard Worker # on many core machine. 142*cda5da8dSAndroid Build Coastguard Worker max_workers = min(32, (os.cpu_count() or 1) + 4) 143*cda5da8dSAndroid Build Coastguard Worker if max_workers <= 0: 144*cda5da8dSAndroid Build Coastguard Worker raise ValueError("max_workers must be greater than 0") 145*cda5da8dSAndroid Build Coastguard Worker 146*cda5da8dSAndroid Build Coastguard Worker if initializer is not None and not callable(initializer): 147*cda5da8dSAndroid Build Coastguard Worker raise TypeError("initializer must be a callable") 148*cda5da8dSAndroid Build Coastguard Worker 149*cda5da8dSAndroid Build Coastguard Worker self._max_workers = max_workers 150*cda5da8dSAndroid Build Coastguard Worker self._work_queue = queue.SimpleQueue() 151*cda5da8dSAndroid Build Coastguard Worker self._idle_semaphore = threading.Semaphore(0) 152*cda5da8dSAndroid Build Coastguard Worker self._threads = set() 153*cda5da8dSAndroid Build Coastguard Worker self._broken = False 154*cda5da8dSAndroid Build Coastguard Worker self._shutdown = False 155*cda5da8dSAndroid Build Coastguard Worker self._shutdown_lock = threading.Lock() 156*cda5da8dSAndroid Build Coastguard Worker self._thread_name_prefix = (thread_name_prefix or 157*cda5da8dSAndroid Build Coastguard Worker ("ThreadPoolExecutor-%d" % self._counter())) 158*cda5da8dSAndroid Build Coastguard Worker self._initializer = initializer 159*cda5da8dSAndroid Build Coastguard Worker self._initargs = initargs 160*cda5da8dSAndroid Build Coastguard Worker 161*cda5da8dSAndroid Build Coastguard Worker def submit(self, fn, /, *args, **kwargs): 162*cda5da8dSAndroid Build Coastguard Worker with self._shutdown_lock, _global_shutdown_lock: 163*cda5da8dSAndroid Build Coastguard Worker if self._broken: 164*cda5da8dSAndroid Build Coastguard Worker raise BrokenThreadPool(self._broken) 165*cda5da8dSAndroid Build Coastguard Worker 166*cda5da8dSAndroid Build Coastguard Worker if self._shutdown: 167*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('cannot schedule new futures after shutdown') 168*cda5da8dSAndroid Build Coastguard Worker if _shutdown: 169*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('cannot schedule new futures after ' 170*cda5da8dSAndroid Build Coastguard Worker 'interpreter shutdown') 171*cda5da8dSAndroid Build Coastguard Worker 172*cda5da8dSAndroid Build Coastguard Worker f = _base.Future() 173*cda5da8dSAndroid Build Coastguard Worker w = _WorkItem(f, fn, args, kwargs) 174*cda5da8dSAndroid Build Coastguard Worker 175*cda5da8dSAndroid Build Coastguard Worker self._work_queue.put(w) 176*cda5da8dSAndroid Build Coastguard Worker self._adjust_thread_count() 177*cda5da8dSAndroid Build Coastguard Worker return f 178*cda5da8dSAndroid Build Coastguard Worker submit.__doc__ = _base.Executor.submit.__doc__ 179*cda5da8dSAndroid Build Coastguard Worker 180*cda5da8dSAndroid Build Coastguard Worker def _adjust_thread_count(self): 181*cda5da8dSAndroid Build Coastguard Worker # if idle threads are available, don't spin new threads 182*cda5da8dSAndroid Build Coastguard Worker if self._idle_semaphore.acquire(timeout=0): 183*cda5da8dSAndroid Build Coastguard Worker return 184*cda5da8dSAndroid Build Coastguard Worker 185*cda5da8dSAndroid Build Coastguard Worker # When the executor gets lost, the weakref callback will wake up 186*cda5da8dSAndroid Build Coastguard Worker # the worker threads. 187*cda5da8dSAndroid Build Coastguard Worker def weakref_cb(_, q=self._work_queue): 188*cda5da8dSAndroid Build Coastguard Worker q.put(None) 189*cda5da8dSAndroid Build Coastguard Worker 190*cda5da8dSAndroid Build Coastguard Worker num_threads = len(self._threads) 191*cda5da8dSAndroid Build Coastguard Worker if num_threads < self._max_workers: 192*cda5da8dSAndroid Build Coastguard Worker thread_name = '%s_%d' % (self._thread_name_prefix or self, 193*cda5da8dSAndroid Build Coastguard Worker num_threads) 194*cda5da8dSAndroid Build Coastguard Worker t = threading.Thread(name=thread_name, target=_worker, 195*cda5da8dSAndroid Build Coastguard Worker args=(weakref.ref(self, weakref_cb), 196*cda5da8dSAndroid Build Coastguard Worker self._work_queue, 197*cda5da8dSAndroid Build Coastguard Worker self._initializer, 198*cda5da8dSAndroid Build Coastguard Worker self._initargs)) 199*cda5da8dSAndroid Build Coastguard Worker t.start() 200*cda5da8dSAndroid Build Coastguard Worker self._threads.add(t) 201*cda5da8dSAndroid Build Coastguard Worker _threads_queues[t] = self._work_queue 202*cda5da8dSAndroid Build Coastguard Worker 203*cda5da8dSAndroid Build Coastguard Worker def _initializer_failed(self): 204*cda5da8dSAndroid Build Coastguard Worker with self._shutdown_lock: 205*cda5da8dSAndroid Build Coastguard Worker self._broken = ('A thread initializer failed, the thread pool ' 206*cda5da8dSAndroid Build Coastguard Worker 'is not usable anymore') 207*cda5da8dSAndroid Build Coastguard Worker # Drain work queue and mark pending futures failed 208*cda5da8dSAndroid Build Coastguard Worker while True: 209*cda5da8dSAndroid Build Coastguard Worker try: 210*cda5da8dSAndroid Build Coastguard Worker work_item = self._work_queue.get_nowait() 211*cda5da8dSAndroid Build Coastguard Worker except queue.Empty: 212*cda5da8dSAndroid Build Coastguard Worker break 213*cda5da8dSAndroid Build Coastguard Worker if work_item is not None: 214*cda5da8dSAndroid Build Coastguard Worker work_item.future.set_exception(BrokenThreadPool(self._broken)) 215*cda5da8dSAndroid Build Coastguard Worker 216*cda5da8dSAndroid Build Coastguard Worker def shutdown(self, wait=True, *, cancel_futures=False): 217*cda5da8dSAndroid Build Coastguard Worker with self._shutdown_lock: 218*cda5da8dSAndroid Build Coastguard Worker self._shutdown = True 219*cda5da8dSAndroid Build Coastguard Worker if cancel_futures: 220*cda5da8dSAndroid Build Coastguard Worker # Drain all work items from the queue, and then cancel their 221*cda5da8dSAndroid Build Coastguard Worker # associated futures. 222*cda5da8dSAndroid Build Coastguard Worker while True: 223*cda5da8dSAndroid Build Coastguard Worker try: 224*cda5da8dSAndroid Build Coastguard Worker work_item = self._work_queue.get_nowait() 225*cda5da8dSAndroid Build Coastguard Worker except queue.Empty: 226*cda5da8dSAndroid Build Coastguard Worker break 227*cda5da8dSAndroid Build Coastguard Worker if work_item is not None: 228*cda5da8dSAndroid Build Coastguard Worker work_item.future.cancel() 229*cda5da8dSAndroid Build Coastguard Worker 230*cda5da8dSAndroid Build Coastguard Worker # Send a wake-up to prevent threads calling 231*cda5da8dSAndroid Build Coastguard Worker # _work_queue.get(block=True) from permanently blocking. 232*cda5da8dSAndroid Build Coastguard Worker self._work_queue.put(None) 233*cda5da8dSAndroid Build Coastguard Worker if wait: 234*cda5da8dSAndroid Build Coastguard Worker for t in self._threads: 235*cda5da8dSAndroid Build Coastguard Worker t.join() 236*cda5da8dSAndroid Build Coastguard Worker shutdown.__doc__ = _base.Executor.shutdown.__doc__ 237