xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/concurrent/futures/thread.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1*cda5da8dSAndroid Build Coastguard Worker# Copyright 2009 Brian Quinlan. All Rights Reserved.
2*cda5da8dSAndroid Build Coastguard Worker# Licensed to PSF under a Contributor Agreement.
3*cda5da8dSAndroid Build Coastguard Worker
4*cda5da8dSAndroid Build Coastguard Worker"""Implements ThreadPoolExecutor."""
5*cda5da8dSAndroid Build Coastguard Worker
6*cda5da8dSAndroid Build Coastguard Worker__author__ = 'Brian Quinlan ([email protected])'
7*cda5da8dSAndroid Build Coastguard Worker
8*cda5da8dSAndroid Build Coastguard Workerfrom concurrent.futures import _base
9*cda5da8dSAndroid Build Coastguard Workerimport itertools
10*cda5da8dSAndroid Build Coastguard Workerimport queue
11*cda5da8dSAndroid Build Coastguard Workerimport threading
12*cda5da8dSAndroid Build Coastguard Workerimport types
13*cda5da8dSAndroid Build Coastguard Workerimport weakref
14*cda5da8dSAndroid Build Coastguard Workerimport os
15*cda5da8dSAndroid Build Coastguard Worker
16*cda5da8dSAndroid Build Coastguard Worker
17*cda5da8dSAndroid Build Coastguard Worker_threads_queues = weakref.WeakKeyDictionary()
18*cda5da8dSAndroid Build Coastguard Worker_shutdown = False
19*cda5da8dSAndroid Build Coastguard Worker# Lock that ensures that new workers are not created while the interpreter is
20*cda5da8dSAndroid Build Coastguard Worker# shutting down. Must be held while mutating _threads_queues and _shutdown.
21*cda5da8dSAndroid Build Coastguard Worker_global_shutdown_lock = threading.Lock()
22*cda5da8dSAndroid Build Coastguard Worker
23*cda5da8dSAndroid Build Coastguard Workerdef _python_exit():
24*cda5da8dSAndroid Build Coastguard Worker    global _shutdown
25*cda5da8dSAndroid Build Coastguard Worker    with _global_shutdown_lock:
26*cda5da8dSAndroid Build Coastguard Worker        _shutdown = True
27*cda5da8dSAndroid Build Coastguard Worker    items = list(_threads_queues.items())
28*cda5da8dSAndroid Build Coastguard Worker    for t, q in items:
29*cda5da8dSAndroid Build Coastguard Worker        q.put(None)
30*cda5da8dSAndroid Build Coastguard Worker    for t, q in items:
31*cda5da8dSAndroid Build Coastguard Worker        t.join()
32*cda5da8dSAndroid Build Coastguard Worker
33*cda5da8dSAndroid Build Coastguard Worker# Register for `_python_exit()` to be called just before joining all
34*cda5da8dSAndroid Build Coastguard Worker# non-daemon threads. This is used instead of `atexit.register()` for
35*cda5da8dSAndroid Build Coastguard Worker# compatibility with subinterpreters, which no longer support daemon threads.
36*cda5da8dSAndroid Build Coastguard Worker# See bpo-39812 for context.
37*cda5da8dSAndroid Build Coastguard Workerthreading._register_atexit(_python_exit)
38*cda5da8dSAndroid Build Coastguard Worker
39*cda5da8dSAndroid Build Coastguard Worker# At fork, reinitialize the `_global_shutdown_lock` lock in the child process
40*cda5da8dSAndroid Build Coastguard Workerif hasattr(os, 'register_at_fork'):
41*cda5da8dSAndroid Build Coastguard Worker    os.register_at_fork(before=_global_shutdown_lock.acquire,
42*cda5da8dSAndroid Build Coastguard Worker                        after_in_child=_global_shutdown_lock._at_fork_reinit,
43*cda5da8dSAndroid Build Coastguard Worker                        after_in_parent=_global_shutdown_lock.release)
44*cda5da8dSAndroid Build Coastguard Worker
45*cda5da8dSAndroid Build Coastguard Worker
46*cda5da8dSAndroid Build Coastguard Workerclass _WorkItem(object):
47*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, future, fn, args, kwargs):
48*cda5da8dSAndroid Build Coastguard Worker        self.future = future
49*cda5da8dSAndroid Build Coastguard Worker        self.fn = fn
50*cda5da8dSAndroid Build Coastguard Worker        self.args = args
51*cda5da8dSAndroid Build Coastguard Worker        self.kwargs = kwargs
52*cda5da8dSAndroid Build Coastguard Worker
53*cda5da8dSAndroid Build Coastguard Worker    def run(self):
54*cda5da8dSAndroid Build Coastguard Worker        if not self.future.set_running_or_notify_cancel():
55*cda5da8dSAndroid Build Coastguard Worker            return
56*cda5da8dSAndroid Build Coastguard Worker
57*cda5da8dSAndroid Build Coastguard Worker        try:
58*cda5da8dSAndroid Build Coastguard Worker            result = self.fn(*self.args, **self.kwargs)
59*cda5da8dSAndroid Build Coastguard Worker        except BaseException as exc:
60*cda5da8dSAndroid Build Coastguard Worker            self.future.set_exception(exc)
61*cda5da8dSAndroid Build Coastguard Worker            # Break a reference cycle with the exception 'exc'
62*cda5da8dSAndroid Build Coastguard Worker            self = None
63*cda5da8dSAndroid Build Coastguard Worker        else:
64*cda5da8dSAndroid Build Coastguard Worker            self.future.set_result(result)
65*cda5da8dSAndroid Build Coastguard Worker
66*cda5da8dSAndroid Build Coastguard Worker    __class_getitem__ = classmethod(types.GenericAlias)
67*cda5da8dSAndroid Build Coastguard Worker
68*cda5da8dSAndroid Build Coastguard Worker
69*cda5da8dSAndroid Build Coastguard Workerdef _worker(executor_reference, work_queue, initializer, initargs):
70*cda5da8dSAndroid Build Coastguard Worker    if initializer is not None:
71*cda5da8dSAndroid Build Coastguard Worker        try:
72*cda5da8dSAndroid Build Coastguard Worker            initializer(*initargs)
73*cda5da8dSAndroid Build Coastguard Worker        except BaseException:
74*cda5da8dSAndroid Build Coastguard Worker            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
75*cda5da8dSAndroid Build Coastguard Worker            executor = executor_reference()
76*cda5da8dSAndroid Build Coastguard Worker            if executor is not None:
77*cda5da8dSAndroid Build Coastguard Worker                executor._initializer_failed()
78*cda5da8dSAndroid Build Coastguard Worker            return
79*cda5da8dSAndroid Build Coastguard Worker    try:
80*cda5da8dSAndroid Build Coastguard Worker        while True:
81*cda5da8dSAndroid Build Coastguard Worker            work_item = work_queue.get(block=True)
82*cda5da8dSAndroid Build Coastguard Worker            if work_item is not None:
83*cda5da8dSAndroid Build Coastguard Worker                work_item.run()
84*cda5da8dSAndroid Build Coastguard Worker                # Delete references to object. See issue16284
85*cda5da8dSAndroid Build Coastguard Worker                del work_item
86*cda5da8dSAndroid Build Coastguard Worker
87*cda5da8dSAndroid Build Coastguard Worker                # attempt to increment idle count
88*cda5da8dSAndroid Build Coastguard Worker                executor = executor_reference()
89*cda5da8dSAndroid Build Coastguard Worker                if executor is not None:
90*cda5da8dSAndroid Build Coastguard Worker                    executor._idle_semaphore.release()
91*cda5da8dSAndroid Build Coastguard Worker                del executor
92*cda5da8dSAndroid Build Coastguard Worker                continue
93*cda5da8dSAndroid Build Coastguard Worker
94*cda5da8dSAndroid Build Coastguard Worker            executor = executor_reference()
95*cda5da8dSAndroid Build Coastguard Worker            # Exit if:
96*cda5da8dSAndroid Build Coastguard Worker            #   - The interpreter is shutting down OR
97*cda5da8dSAndroid Build Coastguard Worker            #   - The executor that owns the worker has been collected OR
98*cda5da8dSAndroid Build Coastguard Worker            #   - The executor that owns the worker has been shutdown.
99*cda5da8dSAndroid Build Coastguard Worker            if _shutdown or executor is None or executor._shutdown:
100*cda5da8dSAndroid Build Coastguard Worker                # Flag the executor as shutting down as early as possible if it
101*cda5da8dSAndroid Build Coastguard Worker                # is not gc-ed yet.
102*cda5da8dSAndroid Build Coastguard Worker                if executor is not None:
103*cda5da8dSAndroid Build Coastguard Worker                    executor._shutdown = True
104*cda5da8dSAndroid Build Coastguard Worker                # Notice other workers
105*cda5da8dSAndroid Build Coastguard Worker                work_queue.put(None)
106*cda5da8dSAndroid Build Coastguard Worker                return
107*cda5da8dSAndroid Build Coastguard Worker            del executor
108*cda5da8dSAndroid Build Coastguard Worker    except BaseException:
109*cda5da8dSAndroid Build Coastguard Worker        _base.LOGGER.critical('Exception in worker', exc_info=True)
110*cda5da8dSAndroid Build Coastguard Worker
111*cda5da8dSAndroid Build Coastguard Worker
112*cda5da8dSAndroid Build Coastguard Workerclass BrokenThreadPool(_base.BrokenExecutor):
113*cda5da8dSAndroid Build Coastguard Worker    """
114*cda5da8dSAndroid Build Coastguard Worker    Raised when a worker thread in a ThreadPoolExecutor failed initializing.
115*cda5da8dSAndroid Build Coastguard Worker    """
116*cda5da8dSAndroid Build Coastguard Worker
117*cda5da8dSAndroid Build Coastguard Worker
118*cda5da8dSAndroid Build Coastguard Workerclass ThreadPoolExecutor(_base.Executor):
119*cda5da8dSAndroid Build Coastguard Worker
120*cda5da8dSAndroid Build Coastguard Worker    # Used to assign unique thread names when thread_name_prefix is not supplied.
121*cda5da8dSAndroid Build Coastguard Worker    _counter = itertools.count().__next__
122*cda5da8dSAndroid Build Coastguard Worker
123*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, max_workers=None, thread_name_prefix='',
124*cda5da8dSAndroid Build Coastguard Worker                 initializer=None, initargs=()):
125*cda5da8dSAndroid Build Coastguard Worker        """Initializes a new ThreadPoolExecutor instance.
126*cda5da8dSAndroid Build Coastguard Worker
127*cda5da8dSAndroid Build Coastguard Worker        Args:
128*cda5da8dSAndroid Build Coastguard Worker            max_workers: The maximum number of threads that can be used to
129*cda5da8dSAndroid Build Coastguard Worker                execute the given calls.
130*cda5da8dSAndroid Build Coastguard Worker            thread_name_prefix: An optional name prefix to give our threads.
131*cda5da8dSAndroid Build Coastguard Worker            initializer: A callable used to initialize worker threads.
132*cda5da8dSAndroid Build Coastguard Worker            initargs: A tuple of arguments to pass to the initializer.
133*cda5da8dSAndroid Build Coastguard Worker        """
134*cda5da8dSAndroid Build Coastguard Worker        if max_workers is None:
135*cda5da8dSAndroid Build Coastguard Worker            # ThreadPoolExecutor is often used to:
136*cda5da8dSAndroid Build Coastguard Worker            # * CPU bound task which releases GIL
137*cda5da8dSAndroid Build Coastguard Worker            # * I/O bound task (which releases GIL, of course)
138*cda5da8dSAndroid Build Coastguard Worker            #
139*cda5da8dSAndroid Build Coastguard Worker            # We use cpu_count + 4 for both types of tasks.
140*cda5da8dSAndroid Build Coastguard Worker            # But we limit it to 32 to avoid consuming surprisingly large resource
141*cda5da8dSAndroid Build Coastguard Worker            # on many core machine.
142*cda5da8dSAndroid Build Coastguard Worker            max_workers = min(32, (os.cpu_count() or 1) + 4)
143*cda5da8dSAndroid Build Coastguard Worker        if max_workers <= 0:
144*cda5da8dSAndroid Build Coastguard Worker            raise ValueError("max_workers must be greater than 0")
145*cda5da8dSAndroid Build Coastguard Worker
146*cda5da8dSAndroid Build Coastguard Worker        if initializer is not None and not callable(initializer):
147*cda5da8dSAndroid Build Coastguard Worker            raise TypeError("initializer must be a callable")
148*cda5da8dSAndroid Build Coastguard Worker
149*cda5da8dSAndroid Build Coastguard Worker        self._max_workers = max_workers
150*cda5da8dSAndroid Build Coastguard Worker        self._work_queue = queue.SimpleQueue()
151*cda5da8dSAndroid Build Coastguard Worker        self._idle_semaphore = threading.Semaphore(0)
152*cda5da8dSAndroid Build Coastguard Worker        self._threads = set()
153*cda5da8dSAndroid Build Coastguard Worker        self._broken = False
154*cda5da8dSAndroid Build Coastguard Worker        self._shutdown = False
155*cda5da8dSAndroid Build Coastguard Worker        self._shutdown_lock = threading.Lock()
156*cda5da8dSAndroid Build Coastguard Worker        self._thread_name_prefix = (thread_name_prefix or
157*cda5da8dSAndroid Build Coastguard Worker                                    ("ThreadPoolExecutor-%d" % self._counter()))
158*cda5da8dSAndroid Build Coastguard Worker        self._initializer = initializer
159*cda5da8dSAndroid Build Coastguard Worker        self._initargs = initargs
160*cda5da8dSAndroid Build Coastguard Worker
161*cda5da8dSAndroid Build Coastguard Worker    def submit(self, fn, /, *args, **kwargs):
162*cda5da8dSAndroid Build Coastguard Worker        with self._shutdown_lock, _global_shutdown_lock:
163*cda5da8dSAndroid Build Coastguard Worker            if self._broken:
164*cda5da8dSAndroid Build Coastguard Worker                raise BrokenThreadPool(self._broken)
165*cda5da8dSAndroid Build Coastguard Worker
166*cda5da8dSAndroid Build Coastguard Worker            if self._shutdown:
167*cda5da8dSAndroid Build Coastguard Worker                raise RuntimeError('cannot schedule new futures after shutdown')
168*cda5da8dSAndroid Build Coastguard Worker            if _shutdown:
169*cda5da8dSAndroid Build Coastguard Worker                raise RuntimeError('cannot schedule new futures after '
170*cda5da8dSAndroid Build Coastguard Worker                                   'interpreter shutdown')
171*cda5da8dSAndroid Build Coastguard Worker
172*cda5da8dSAndroid Build Coastguard Worker            f = _base.Future()
173*cda5da8dSAndroid Build Coastguard Worker            w = _WorkItem(f, fn, args, kwargs)
174*cda5da8dSAndroid Build Coastguard Worker
175*cda5da8dSAndroid Build Coastguard Worker            self._work_queue.put(w)
176*cda5da8dSAndroid Build Coastguard Worker            self._adjust_thread_count()
177*cda5da8dSAndroid Build Coastguard Worker            return f
178*cda5da8dSAndroid Build Coastguard Worker    submit.__doc__ = _base.Executor.submit.__doc__
179*cda5da8dSAndroid Build Coastguard Worker
180*cda5da8dSAndroid Build Coastguard Worker    def _adjust_thread_count(self):
181*cda5da8dSAndroid Build Coastguard Worker        # if idle threads are available, don't spin new threads
182*cda5da8dSAndroid Build Coastguard Worker        if self._idle_semaphore.acquire(timeout=0):
183*cda5da8dSAndroid Build Coastguard Worker            return
184*cda5da8dSAndroid Build Coastguard Worker
185*cda5da8dSAndroid Build Coastguard Worker        # When the executor gets lost, the weakref callback will wake up
186*cda5da8dSAndroid Build Coastguard Worker        # the worker threads.
187*cda5da8dSAndroid Build Coastguard Worker        def weakref_cb(_, q=self._work_queue):
188*cda5da8dSAndroid Build Coastguard Worker            q.put(None)
189*cda5da8dSAndroid Build Coastguard Worker
190*cda5da8dSAndroid Build Coastguard Worker        num_threads = len(self._threads)
191*cda5da8dSAndroid Build Coastguard Worker        if num_threads < self._max_workers:
192*cda5da8dSAndroid Build Coastguard Worker            thread_name = '%s_%d' % (self._thread_name_prefix or self,
193*cda5da8dSAndroid Build Coastguard Worker                                     num_threads)
194*cda5da8dSAndroid Build Coastguard Worker            t = threading.Thread(name=thread_name, target=_worker,
195*cda5da8dSAndroid Build Coastguard Worker                                 args=(weakref.ref(self, weakref_cb),
196*cda5da8dSAndroid Build Coastguard Worker                                       self._work_queue,
197*cda5da8dSAndroid Build Coastguard Worker                                       self._initializer,
198*cda5da8dSAndroid Build Coastguard Worker                                       self._initargs))
199*cda5da8dSAndroid Build Coastguard Worker            t.start()
200*cda5da8dSAndroid Build Coastguard Worker            self._threads.add(t)
201*cda5da8dSAndroid Build Coastguard Worker            _threads_queues[t] = self._work_queue
202*cda5da8dSAndroid Build Coastguard Worker
203*cda5da8dSAndroid Build Coastguard Worker    def _initializer_failed(self):
204*cda5da8dSAndroid Build Coastguard Worker        with self._shutdown_lock:
205*cda5da8dSAndroid Build Coastguard Worker            self._broken = ('A thread initializer failed, the thread pool '
206*cda5da8dSAndroid Build Coastguard Worker                            'is not usable anymore')
207*cda5da8dSAndroid Build Coastguard Worker            # Drain work queue and mark pending futures failed
208*cda5da8dSAndroid Build Coastguard Worker            while True:
209*cda5da8dSAndroid Build Coastguard Worker                try:
210*cda5da8dSAndroid Build Coastguard Worker                    work_item = self._work_queue.get_nowait()
211*cda5da8dSAndroid Build Coastguard Worker                except queue.Empty:
212*cda5da8dSAndroid Build Coastguard Worker                    break
213*cda5da8dSAndroid Build Coastguard Worker                if work_item is not None:
214*cda5da8dSAndroid Build Coastguard Worker                    work_item.future.set_exception(BrokenThreadPool(self._broken))
215*cda5da8dSAndroid Build Coastguard Worker
216*cda5da8dSAndroid Build Coastguard Worker    def shutdown(self, wait=True, *, cancel_futures=False):
217*cda5da8dSAndroid Build Coastguard Worker        with self._shutdown_lock:
218*cda5da8dSAndroid Build Coastguard Worker            self._shutdown = True
219*cda5da8dSAndroid Build Coastguard Worker            if cancel_futures:
220*cda5da8dSAndroid Build Coastguard Worker                # Drain all work items from the queue, and then cancel their
221*cda5da8dSAndroid Build Coastguard Worker                # associated futures.
222*cda5da8dSAndroid Build Coastguard Worker                while True:
223*cda5da8dSAndroid Build Coastguard Worker                    try:
224*cda5da8dSAndroid Build Coastguard Worker                        work_item = self._work_queue.get_nowait()
225*cda5da8dSAndroid Build Coastguard Worker                    except queue.Empty:
226*cda5da8dSAndroid Build Coastguard Worker                        break
227*cda5da8dSAndroid Build Coastguard Worker                    if work_item is not None:
228*cda5da8dSAndroid Build Coastguard Worker                        work_item.future.cancel()
229*cda5da8dSAndroid Build Coastguard Worker
230*cda5da8dSAndroid Build Coastguard Worker            # Send a wake-up to prevent threads calling
231*cda5da8dSAndroid Build Coastguard Worker            # _work_queue.get(block=True) from permanently blocking.
232*cda5da8dSAndroid Build Coastguard Worker            self._work_queue.put(None)
233*cda5da8dSAndroid Build Coastguard Worker        if wait:
234*cda5da8dSAndroid Build Coastguard Worker            for t in self._threads:
235*cda5da8dSAndroid Build Coastguard Worker                t.join()
236*cda5da8dSAndroid Build Coastguard Worker    shutdown.__doc__ = _base.Executor.shutdown.__doc__
237