xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/multiprocessing/pool.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1*cda5da8dSAndroid Build Coastguard Worker#
2*cda5da8dSAndroid Build Coastguard Worker# Module providing the `Pool` class for managing a process pool
3*cda5da8dSAndroid Build Coastguard Worker#
4*cda5da8dSAndroid Build Coastguard Worker# multiprocessing/pool.py
5*cda5da8dSAndroid Build Coastguard Worker#
6*cda5da8dSAndroid Build Coastguard Worker# Copyright (c) 2006-2008, R Oudkerk
7*cda5da8dSAndroid Build Coastguard Worker# Licensed to PSF under a Contributor Agreement.
8*cda5da8dSAndroid Build Coastguard Worker#
9*cda5da8dSAndroid Build Coastguard Worker
10*cda5da8dSAndroid Build Coastguard Worker__all__ = ['Pool', 'ThreadPool']
11*cda5da8dSAndroid Build Coastguard Worker
12*cda5da8dSAndroid Build Coastguard Worker#
13*cda5da8dSAndroid Build Coastguard Worker# Imports
14*cda5da8dSAndroid Build Coastguard Worker#
15*cda5da8dSAndroid Build Coastguard Worker
16*cda5da8dSAndroid Build Coastguard Workerimport collections
17*cda5da8dSAndroid Build Coastguard Workerimport itertools
18*cda5da8dSAndroid Build Coastguard Workerimport os
19*cda5da8dSAndroid Build Coastguard Workerimport queue
20*cda5da8dSAndroid Build Coastguard Workerimport threading
21*cda5da8dSAndroid Build Coastguard Workerimport time
22*cda5da8dSAndroid Build Coastguard Workerimport traceback
23*cda5da8dSAndroid Build Coastguard Workerimport types
24*cda5da8dSAndroid Build Coastguard Workerimport warnings
25*cda5da8dSAndroid Build Coastguard Worker
26*cda5da8dSAndroid Build Coastguard Worker# If threading is available then ThreadPool should be provided.  Therefore
27*cda5da8dSAndroid Build Coastguard Worker# we avoid top-level imports which are liable to fail on some systems.
28*cda5da8dSAndroid Build Coastguard Workerfrom . import util
29*cda5da8dSAndroid Build Coastguard Workerfrom . import get_context, TimeoutError
30*cda5da8dSAndroid Build Coastguard Workerfrom .connection import wait
31*cda5da8dSAndroid Build Coastguard Worker
32*cda5da8dSAndroid Build Coastguard Worker#
33*cda5da8dSAndroid Build Coastguard Worker# Constants representing the state of a pool
34*cda5da8dSAndroid Build Coastguard Worker#
35*cda5da8dSAndroid Build Coastguard Worker
36*cda5da8dSAndroid Build Coastguard WorkerINIT = "INIT"
37*cda5da8dSAndroid Build Coastguard WorkerRUN = "RUN"
38*cda5da8dSAndroid Build Coastguard WorkerCLOSE = "CLOSE"
39*cda5da8dSAndroid Build Coastguard WorkerTERMINATE = "TERMINATE"
40*cda5da8dSAndroid Build Coastguard Worker
41*cda5da8dSAndroid Build Coastguard Worker#
42*cda5da8dSAndroid Build Coastguard Worker# Miscellaneous
43*cda5da8dSAndroid Build Coastguard Worker#
44*cda5da8dSAndroid Build Coastguard Worker
45*cda5da8dSAndroid Build Coastguard Workerjob_counter = itertools.count()
46*cda5da8dSAndroid Build Coastguard Worker
47*cda5da8dSAndroid Build Coastguard Workerdef mapstar(args):
48*cda5da8dSAndroid Build Coastguard Worker    return list(map(*args))
49*cda5da8dSAndroid Build Coastguard Worker
50*cda5da8dSAndroid Build Coastguard Workerdef starmapstar(args):
51*cda5da8dSAndroid Build Coastguard Worker    return list(itertools.starmap(args[0], args[1]))
52*cda5da8dSAndroid Build Coastguard Worker
53*cda5da8dSAndroid Build Coastguard Worker#
54*cda5da8dSAndroid Build Coastguard Worker# Hack to embed stringification of remote traceback in local traceback
55*cda5da8dSAndroid Build Coastguard Worker#
56*cda5da8dSAndroid Build Coastguard Worker
57*cda5da8dSAndroid Build Coastguard Workerclass RemoteTraceback(Exception):
58*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, tb):
59*cda5da8dSAndroid Build Coastguard Worker        self.tb = tb
60*cda5da8dSAndroid Build Coastguard Worker    def __str__(self):
61*cda5da8dSAndroid Build Coastguard Worker        return self.tb
62*cda5da8dSAndroid Build Coastguard Worker
63*cda5da8dSAndroid Build Coastguard Workerclass ExceptionWithTraceback:
64*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, exc, tb):
65*cda5da8dSAndroid Build Coastguard Worker        tb = traceback.format_exception(type(exc), exc, tb)
66*cda5da8dSAndroid Build Coastguard Worker        tb = ''.join(tb)
67*cda5da8dSAndroid Build Coastguard Worker        self.exc = exc
68*cda5da8dSAndroid Build Coastguard Worker        self.tb = '\n"""\n%s"""' % tb
69*cda5da8dSAndroid Build Coastguard Worker    def __reduce__(self):
70*cda5da8dSAndroid Build Coastguard Worker        return rebuild_exc, (self.exc, self.tb)
71*cda5da8dSAndroid Build Coastguard Worker
72*cda5da8dSAndroid Build Coastguard Workerdef rebuild_exc(exc, tb):
73*cda5da8dSAndroid Build Coastguard Worker    exc.__cause__ = RemoteTraceback(tb)
74*cda5da8dSAndroid Build Coastguard Worker    return exc
75*cda5da8dSAndroid Build Coastguard Worker
76*cda5da8dSAndroid Build Coastguard Worker#
77*cda5da8dSAndroid Build Coastguard Worker# Code run by worker processes
78*cda5da8dSAndroid Build Coastguard Worker#
79*cda5da8dSAndroid Build Coastguard Worker
80*cda5da8dSAndroid Build Coastguard Workerclass MaybeEncodingError(Exception):
81*cda5da8dSAndroid Build Coastguard Worker    """Wraps possible unpickleable errors, so they can be
82*cda5da8dSAndroid Build Coastguard Worker    safely sent through the socket."""
83*cda5da8dSAndroid Build Coastguard Worker
84*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, exc, value):
85*cda5da8dSAndroid Build Coastguard Worker        self.exc = repr(exc)
86*cda5da8dSAndroid Build Coastguard Worker        self.value = repr(value)
87*cda5da8dSAndroid Build Coastguard Worker        super(MaybeEncodingError, self).__init__(self.exc, self.value)
88*cda5da8dSAndroid Build Coastguard Worker
89*cda5da8dSAndroid Build Coastguard Worker    def __str__(self):
90*cda5da8dSAndroid Build Coastguard Worker        return "Error sending result: '%s'. Reason: '%s'" % (self.value,
91*cda5da8dSAndroid Build Coastguard Worker                                                             self.exc)
92*cda5da8dSAndroid Build Coastguard Worker
93*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
94*cda5da8dSAndroid Build Coastguard Worker        return "<%s: %s>" % (self.__class__.__name__, self)
95*cda5da8dSAndroid Build Coastguard Worker
96*cda5da8dSAndroid Build Coastguard Worker
97*cda5da8dSAndroid Build Coastguard Workerdef worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
98*cda5da8dSAndroid Build Coastguard Worker           wrap_exception=False):
99*cda5da8dSAndroid Build Coastguard Worker    if (maxtasks is not None) and not (isinstance(maxtasks, int)
100*cda5da8dSAndroid Build Coastguard Worker                                       and maxtasks >= 1):
101*cda5da8dSAndroid Build Coastguard Worker        raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
102*cda5da8dSAndroid Build Coastguard Worker    put = outqueue.put
103*cda5da8dSAndroid Build Coastguard Worker    get = inqueue.get
104*cda5da8dSAndroid Build Coastguard Worker    if hasattr(inqueue, '_writer'):
105*cda5da8dSAndroid Build Coastguard Worker        inqueue._writer.close()
106*cda5da8dSAndroid Build Coastguard Worker        outqueue._reader.close()
107*cda5da8dSAndroid Build Coastguard Worker
108*cda5da8dSAndroid Build Coastguard Worker    if initializer is not None:
109*cda5da8dSAndroid Build Coastguard Worker        initializer(*initargs)
110*cda5da8dSAndroid Build Coastguard Worker
111*cda5da8dSAndroid Build Coastguard Worker    completed = 0
112*cda5da8dSAndroid Build Coastguard Worker    while maxtasks is None or (maxtasks and completed < maxtasks):
113*cda5da8dSAndroid Build Coastguard Worker        try:
114*cda5da8dSAndroid Build Coastguard Worker            task = get()
115*cda5da8dSAndroid Build Coastguard Worker        except (EOFError, OSError):
116*cda5da8dSAndroid Build Coastguard Worker            util.debug('worker got EOFError or OSError -- exiting')
117*cda5da8dSAndroid Build Coastguard Worker            break
118*cda5da8dSAndroid Build Coastguard Worker
119*cda5da8dSAndroid Build Coastguard Worker        if task is None:
120*cda5da8dSAndroid Build Coastguard Worker            util.debug('worker got sentinel -- exiting')
121*cda5da8dSAndroid Build Coastguard Worker            break
122*cda5da8dSAndroid Build Coastguard Worker
123*cda5da8dSAndroid Build Coastguard Worker        job, i, func, args, kwds = task
124*cda5da8dSAndroid Build Coastguard Worker        try:
125*cda5da8dSAndroid Build Coastguard Worker            result = (True, func(*args, **kwds))
126*cda5da8dSAndroid Build Coastguard Worker        except Exception as e:
127*cda5da8dSAndroid Build Coastguard Worker            if wrap_exception and func is not _helper_reraises_exception:
128*cda5da8dSAndroid Build Coastguard Worker                e = ExceptionWithTraceback(e, e.__traceback__)
129*cda5da8dSAndroid Build Coastguard Worker            result = (False, e)
130*cda5da8dSAndroid Build Coastguard Worker        try:
131*cda5da8dSAndroid Build Coastguard Worker            put((job, i, result))
132*cda5da8dSAndroid Build Coastguard Worker        except Exception as e:
133*cda5da8dSAndroid Build Coastguard Worker            wrapped = MaybeEncodingError(e, result[1])
134*cda5da8dSAndroid Build Coastguard Worker            util.debug("Possible encoding error while sending result: %s" % (
135*cda5da8dSAndroid Build Coastguard Worker                wrapped))
136*cda5da8dSAndroid Build Coastguard Worker            put((job, i, (False, wrapped)))
137*cda5da8dSAndroid Build Coastguard Worker
138*cda5da8dSAndroid Build Coastguard Worker        task = job = result = func = args = kwds = None
139*cda5da8dSAndroid Build Coastguard Worker        completed += 1
140*cda5da8dSAndroid Build Coastguard Worker    util.debug('worker exiting after %d tasks' % completed)
141*cda5da8dSAndroid Build Coastguard Worker
142*cda5da8dSAndroid Build Coastguard Workerdef _helper_reraises_exception(ex):
143*cda5da8dSAndroid Build Coastguard Worker    'Pickle-able helper function for use by _guarded_task_generation.'
144*cda5da8dSAndroid Build Coastguard Worker    raise ex
145*cda5da8dSAndroid Build Coastguard Worker
146*cda5da8dSAndroid Build Coastguard Worker#
147*cda5da8dSAndroid Build Coastguard Worker# Class representing a process pool
148*cda5da8dSAndroid Build Coastguard Worker#
149*cda5da8dSAndroid Build Coastguard Worker
150*cda5da8dSAndroid Build Coastguard Workerclass _PoolCache(dict):
151*cda5da8dSAndroid Build Coastguard Worker    """
152*cda5da8dSAndroid Build Coastguard Worker    Class that implements a cache for the Pool class that will notify
153*cda5da8dSAndroid Build Coastguard Worker    the pool management threads every time the cache is emptied. The
154*cda5da8dSAndroid Build Coastguard Worker    notification is done by the use of a queue that is provided when
155*cda5da8dSAndroid Build Coastguard Worker    instantiating the cache.
156*cda5da8dSAndroid Build Coastguard Worker    """
157*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, /, *args, notifier=None, **kwds):
158*cda5da8dSAndroid Build Coastguard Worker        self.notifier = notifier
159*cda5da8dSAndroid Build Coastguard Worker        super().__init__(*args, **kwds)
160*cda5da8dSAndroid Build Coastguard Worker
161*cda5da8dSAndroid Build Coastguard Worker    def __delitem__(self, item):
162*cda5da8dSAndroid Build Coastguard Worker        super().__delitem__(item)
163*cda5da8dSAndroid Build Coastguard Worker
164*cda5da8dSAndroid Build Coastguard Worker        # Notify that the cache is empty. This is important because the
165*cda5da8dSAndroid Build Coastguard Worker        # pool keeps maintaining workers until the cache gets drained. This
166*cda5da8dSAndroid Build Coastguard Worker        # eliminates a race condition in which a task is finished after the
167*cda5da8dSAndroid Build Coastguard Worker        # the pool's _handle_workers method has enter another iteration of the
168*cda5da8dSAndroid Build Coastguard Worker        # loop. In this situation, the only event that can wake up the pool
169*cda5da8dSAndroid Build Coastguard Worker        # is the cache to be emptied (no more tasks available).
170*cda5da8dSAndroid Build Coastguard Worker        if not self:
171*cda5da8dSAndroid Build Coastguard Worker            self.notifier.put(None)
172*cda5da8dSAndroid Build Coastguard Worker
173*cda5da8dSAndroid Build Coastguard Workerclass Pool(object):
174*cda5da8dSAndroid Build Coastguard Worker    '''
175*cda5da8dSAndroid Build Coastguard Worker    Class which supports an async version of applying functions to arguments.
176*cda5da8dSAndroid Build Coastguard Worker    '''
177*cda5da8dSAndroid Build Coastguard Worker    _wrap_exception = True
178*cda5da8dSAndroid Build Coastguard Worker
179*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
180*cda5da8dSAndroid Build Coastguard Worker    def Process(ctx, *args, **kwds):
181*cda5da8dSAndroid Build Coastguard Worker        return ctx.Process(*args, **kwds)
182*cda5da8dSAndroid Build Coastguard Worker
183*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, processes=None, initializer=None, initargs=(),
184*cda5da8dSAndroid Build Coastguard Worker                 maxtasksperchild=None, context=None):
185*cda5da8dSAndroid Build Coastguard Worker        # Attributes initialized early to make sure that they exist in
186*cda5da8dSAndroid Build Coastguard Worker        # __del__() if __init__() raises an exception
187*cda5da8dSAndroid Build Coastguard Worker        self._pool = []
188*cda5da8dSAndroid Build Coastguard Worker        self._state = INIT
189*cda5da8dSAndroid Build Coastguard Worker
190*cda5da8dSAndroid Build Coastguard Worker        self._ctx = context or get_context()
191*cda5da8dSAndroid Build Coastguard Worker        self._setup_queues()
192*cda5da8dSAndroid Build Coastguard Worker        self._taskqueue = queue.SimpleQueue()
193*cda5da8dSAndroid Build Coastguard Worker        # The _change_notifier queue exist to wake up self._handle_workers()
194*cda5da8dSAndroid Build Coastguard Worker        # when the cache (self._cache) is empty or when there is a change in
195*cda5da8dSAndroid Build Coastguard Worker        # the _state variable of the thread that runs _handle_workers.
196*cda5da8dSAndroid Build Coastguard Worker        self._change_notifier = self._ctx.SimpleQueue()
197*cda5da8dSAndroid Build Coastguard Worker        self._cache = _PoolCache(notifier=self._change_notifier)
198*cda5da8dSAndroid Build Coastguard Worker        self._maxtasksperchild = maxtasksperchild
199*cda5da8dSAndroid Build Coastguard Worker        self._initializer = initializer
200*cda5da8dSAndroid Build Coastguard Worker        self._initargs = initargs
201*cda5da8dSAndroid Build Coastguard Worker
202*cda5da8dSAndroid Build Coastguard Worker        if processes is None:
203*cda5da8dSAndroid Build Coastguard Worker            processes = os.cpu_count() or 1
204*cda5da8dSAndroid Build Coastguard Worker        if processes < 1:
205*cda5da8dSAndroid Build Coastguard Worker            raise ValueError("Number of processes must be at least 1")
206*cda5da8dSAndroid Build Coastguard Worker        if maxtasksperchild is not None:
207*cda5da8dSAndroid Build Coastguard Worker            if not isinstance(maxtasksperchild, int) or maxtasksperchild <= 0:
208*cda5da8dSAndroid Build Coastguard Worker                raise ValueError("maxtasksperchild must be a positive int or None")
209*cda5da8dSAndroid Build Coastguard Worker
210*cda5da8dSAndroid Build Coastguard Worker        if initializer is not None and not callable(initializer):
211*cda5da8dSAndroid Build Coastguard Worker            raise TypeError('initializer must be a callable')
212*cda5da8dSAndroid Build Coastguard Worker
213*cda5da8dSAndroid Build Coastguard Worker        self._processes = processes
214*cda5da8dSAndroid Build Coastguard Worker        try:
215*cda5da8dSAndroid Build Coastguard Worker            self._repopulate_pool()
216*cda5da8dSAndroid Build Coastguard Worker        except Exception:
217*cda5da8dSAndroid Build Coastguard Worker            for p in self._pool:
218*cda5da8dSAndroid Build Coastguard Worker                if p.exitcode is None:
219*cda5da8dSAndroid Build Coastguard Worker                    p.terminate()
220*cda5da8dSAndroid Build Coastguard Worker            for p in self._pool:
221*cda5da8dSAndroid Build Coastguard Worker                p.join()
222*cda5da8dSAndroid Build Coastguard Worker            raise
223*cda5da8dSAndroid Build Coastguard Worker
224*cda5da8dSAndroid Build Coastguard Worker        sentinels = self._get_sentinels()
225*cda5da8dSAndroid Build Coastguard Worker
226*cda5da8dSAndroid Build Coastguard Worker        self._worker_handler = threading.Thread(
227*cda5da8dSAndroid Build Coastguard Worker            target=Pool._handle_workers,
228*cda5da8dSAndroid Build Coastguard Worker            args=(self._cache, self._taskqueue, self._ctx, self.Process,
229*cda5da8dSAndroid Build Coastguard Worker                  self._processes, self._pool, self._inqueue, self._outqueue,
230*cda5da8dSAndroid Build Coastguard Worker                  self._initializer, self._initargs, self._maxtasksperchild,
231*cda5da8dSAndroid Build Coastguard Worker                  self._wrap_exception, sentinels, self._change_notifier)
232*cda5da8dSAndroid Build Coastguard Worker            )
233*cda5da8dSAndroid Build Coastguard Worker        self._worker_handler.daemon = True
234*cda5da8dSAndroid Build Coastguard Worker        self._worker_handler._state = RUN
235*cda5da8dSAndroid Build Coastguard Worker        self._worker_handler.start()
236*cda5da8dSAndroid Build Coastguard Worker
237*cda5da8dSAndroid Build Coastguard Worker
238*cda5da8dSAndroid Build Coastguard Worker        self._task_handler = threading.Thread(
239*cda5da8dSAndroid Build Coastguard Worker            target=Pool._handle_tasks,
240*cda5da8dSAndroid Build Coastguard Worker            args=(self._taskqueue, self._quick_put, self._outqueue,
241*cda5da8dSAndroid Build Coastguard Worker                  self._pool, self._cache)
242*cda5da8dSAndroid Build Coastguard Worker            )
243*cda5da8dSAndroid Build Coastguard Worker        self._task_handler.daemon = True
244*cda5da8dSAndroid Build Coastguard Worker        self._task_handler._state = RUN
245*cda5da8dSAndroid Build Coastguard Worker        self._task_handler.start()
246*cda5da8dSAndroid Build Coastguard Worker
247*cda5da8dSAndroid Build Coastguard Worker        self._result_handler = threading.Thread(
248*cda5da8dSAndroid Build Coastguard Worker            target=Pool._handle_results,
249*cda5da8dSAndroid Build Coastguard Worker            args=(self._outqueue, self._quick_get, self._cache)
250*cda5da8dSAndroid Build Coastguard Worker            )
251*cda5da8dSAndroid Build Coastguard Worker        self._result_handler.daemon = True
252*cda5da8dSAndroid Build Coastguard Worker        self._result_handler._state = RUN
253*cda5da8dSAndroid Build Coastguard Worker        self._result_handler.start()
254*cda5da8dSAndroid Build Coastguard Worker
255*cda5da8dSAndroid Build Coastguard Worker        self._terminate = util.Finalize(
256*cda5da8dSAndroid Build Coastguard Worker            self, self._terminate_pool,
257*cda5da8dSAndroid Build Coastguard Worker            args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
258*cda5da8dSAndroid Build Coastguard Worker                  self._change_notifier, self._worker_handler, self._task_handler,
259*cda5da8dSAndroid Build Coastguard Worker                  self._result_handler, self._cache),
260*cda5da8dSAndroid Build Coastguard Worker            exitpriority=15
261*cda5da8dSAndroid Build Coastguard Worker            )
262*cda5da8dSAndroid Build Coastguard Worker        self._state = RUN
263*cda5da8dSAndroid Build Coastguard Worker
264*cda5da8dSAndroid Build Coastguard Worker    # Copy globals as function locals to make sure that they are available
265*cda5da8dSAndroid Build Coastguard Worker    # during Python shutdown when the Pool is destroyed.
266*cda5da8dSAndroid Build Coastguard Worker    def __del__(self, _warn=warnings.warn, RUN=RUN):
267*cda5da8dSAndroid Build Coastguard Worker        if self._state == RUN:
268*cda5da8dSAndroid Build Coastguard Worker            _warn(f"unclosed running multiprocessing pool {self!r}",
269*cda5da8dSAndroid Build Coastguard Worker                  ResourceWarning, source=self)
270*cda5da8dSAndroid Build Coastguard Worker            if getattr(self, '_change_notifier', None) is not None:
271*cda5da8dSAndroid Build Coastguard Worker                self._change_notifier.put(None)
272*cda5da8dSAndroid Build Coastguard Worker
273*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
274*cda5da8dSAndroid Build Coastguard Worker        cls = self.__class__
275*cda5da8dSAndroid Build Coastguard Worker        return (f'<{cls.__module__}.{cls.__qualname__} '
276*cda5da8dSAndroid Build Coastguard Worker                f'state={self._state} '
277*cda5da8dSAndroid Build Coastguard Worker                f'pool_size={len(self._pool)}>')
278*cda5da8dSAndroid Build Coastguard Worker
279*cda5da8dSAndroid Build Coastguard Worker    def _get_sentinels(self):
280*cda5da8dSAndroid Build Coastguard Worker        task_queue_sentinels = [self._outqueue._reader]
281*cda5da8dSAndroid Build Coastguard Worker        self_notifier_sentinels = [self._change_notifier._reader]
282*cda5da8dSAndroid Build Coastguard Worker        return [*task_queue_sentinels, *self_notifier_sentinels]
283*cda5da8dSAndroid Build Coastguard Worker
284*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
285*cda5da8dSAndroid Build Coastguard Worker    def _get_worker_sentinels(workers):
286*cda5da8dSAndroid Build Coastguard Worker        return [worker.sentinel for worker in
287*cda5da8dSAndroid Build Coastguard Worker                workers if hasattr(worker, "sentinel")]
288*cda5da8dSAndroid Build Coastguard Worker
289*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
290*cda5da8dSAndroid Build Coastguard Worker    def _join_exited_workers(pool):
291*cda5da8dSAndroid Build Coastguard Worker        """Cleanup after any worker processes which have exited due to reaching
292*cda5da8dSAndroid Build Coastguard Worker        their specified lifetime.  Returns True if any workers were cleaned up.
293*cda5da8dSAndroid Build Coastguard Worker        """
294*cda5da8dSAndroid Build Coastguard Worker        cleaned = False
295*cda5da8dSAndroid Build Coastguard Worker        for i in reversed(range(len(pool))):
296*cda5da8dSAndroid Build Coastguard Worker            worker = pool[i]
297*cda5da8dSAndroid Build Coastguard Worker            if worker.exitcode is not None:
298*cda5da8dSAndroid Build Coastguard Worker                # worker exited
299*cda5da8dSAndroid Build Coastguard Worker                util.debug('cleaning up worker %d' % i)
300*cda5da8dSAndroid Build Coastguard Worker                worker.join()
301*cda5da8dSAndroid Build Coastguard Worker                cleaned = True
302*cda5da8dSAndroid Build Coastguard Worker                del pool[i]
303*cda5da8dSAndroid Build Coastguard Worker        return cleaned
304*cda5da8dSAndroid Build Coastguard Worker
305*cda5da8dSAndroid Build Coastguard Worker    def _repopulate_pool(self):
306*cda5da8dSAndroid Build Coastguard Worker        return self._repopulate_pool_static(self._ctx, self.Process,
307*cda5da8dSAndroid Build Coastguard Worker                                            self._processes,
308*cda5da8dSAndroid Build Coastguard Worker                                            self._pool, self._inqueue,
309*cda5da8dSAndroid Build Coastguard Worker                                            self._outqueue, self._initializer,
310*cda5da8dSAndroid Build Coastguard Worker                                            self._initargs,
311*cda5da8dSAndroid Build Coastguard Worker                                            self._maxtasksperchild,
312*cda5da8dSAndroid Build Coastguard Worker                                            self._wrap_exception)
313*cda5da8dSAndroid Build Coastguard Worker
314*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
315*cda5da8dSAndroid Build Coastguard Worker    def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
316*cda5da8dSAndroid Build Coastguard Worker                                outqueue, initializer, initargs,
317*cda5da8dSAndroid Build Coastguard Worker                                maxtasksperchild, wrap_exception):
318*cda5da8dSAndroid Build Coastguard Worker        """Bring the number of pool processes up to the specified number,
319*cda5da8dSAndroid Build Coastguard Worker        for use after reaping workers which have exited.
320*cda5da8dSAndroid Build Coastguard Worker        """
321*cda5da8dSAndroid Build Coastguard Worker        for i in range(processes - len(pool)):
322*cda5da8dSAndroid Build Coastguard Worker            w = Process(ctx, target=worker,
323*cda5da8dSAndroid Build Coastguard Worker                        args=(inqueue, outqueue,
324*cda5da8dSAndroid Build Coastguard Worker                              initializer,
325*cda5da8dSAndroid Build Coastguard Worker                              initargs, maxtasksperchild,
326*cda5da8dSAndroid Build Coastguard Worker                              wrap_exception))
327*cda5da8dSAndroid Build Coastguard Worker            w.name = w.name.replace('Process', 'PoolWorker')
328*cda5da8dSAndroid Build Coastguard Worker            w.daemon = True
329*cda5da8dSAndroid Build Coastguard Worker            w.start()
330*cda5da8dSAndroid Build Coastguard Worker            pool.append(w)
331*cda5da8dSAndroid Build Coastguard Worker            util.debug('added worker')
332*cda5da8dSAndroid Build Coastguard Worker
333*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
334*cda5da8dSAndroid Build Coastguard Worker    def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
335*cda5da8dSAndroid Build Coastguard Worker                       initializer, initargs, maxtasksperchild,
336*cda5da8dSAndroid Build Coastguard Worker                       wrap_exception):
337*cda5da8dSAndroid Build Coastguard Worker        """Clean up any exited workers and start replacements for them.
338*cda5da8dSAndroid Build Coastguard Worker        """
339*cda5da8dSAndroid Build Coastguard Worker        if Pool._join_exited_workers(pool):
340*cda5da8dSAndroid Build Coastguard Worker            Pool._repopulate_pool_static(ctx, Process, processes, pool,
341*cda5da8dSAndroid Build Coastguard Worker                                         inqueue, outqueue, initializer,
342*cda5da8dSAndroid Build Coastguard Worker                                         initargs, maxtasksperchild,
343*cda5da8dSAndroid Build Coastguard Worker                                         wrap_exception)
344*cda5da8dSAndroid Build Coastguard Worker
345*cda5da8dSAndroid Build Coastguard Worker    def _setup_queues(self):
346*cda5da8dSAndroid Build Coastguard Worker        self._inqueue = self._ctx.SimpleQueue()
347*cda5da8dSAndroid Build Coastguard Worker        self._outqueue = self._ctx.SimpleQueue()
348*cda5da8dSAndroid Build Coastguard Worker        self._quick_put = self._inqueue._writer.send
349*cda5da8dSAndroid Build Coastguard Worker        self._quick_get = self._outqueue._reader.recv
350*cda5da8dSAndroid Build Coastguard Worker
351*cda5da8dSAndroid Build Coastguard Worker    def _check_running(self):
352*cda5da8dSAndroid Build Coastguard Worker        if self._state != RUN:
353*cda5da8dSAndroid Build Coastguard Worker            raise ValueError("Pool not running")
354*cda5da8dSAndroid Build Coastguard Worker
355*cda5da8dSAndroid Build Coastguard Worker    def apply(self, func, args=(), kwds={}):
356*cda5da8dSAndroid Build Coastguard Worker        '''
357*cda5da8dSAndroid Build Coastguard Worker        Equivalent of `func(*args, **kwds)`.
358*cda5da8dSAndroid Build Coastguard Worker        Pool must be running.
359*cda5da8dSAndroid Build Coastguard Worker        '''
360*cda5da8dSAndroid Build Coastguard Worker        return self.apply_async(func, args, kwds).get()
361*cda5da8dSAndroid Build Coastguard Worker
362*cda5da8dSAndroid Build Coastguard Worker    def map(self, func, iterable, chunksize=None):
363*cda5da8dSAndroid Build Coastguard Worker        '''
364*cda5da8dSAndroid Build Coastguard Worker        Apply `func` to each element in `iterable`, collecting the results
365*cda5da8dSAndroid Build Coastguard Worker        in a list that is returned.
366*cda5da8dSAndroid Build Coastguard Worker        '''
367*cda5da8dSAndroid Build Coastguard Worker        return self._map_async(func, iterable, mapstar, chunksize).get()
368*cda5da8dSAndroid Build Coastguard Worker
369*cda5da8dSAndroid Build Coastguard Worker    def starmap(self, func, iterable, chunksize=None):
370*cda5da8dSAndroid Build Coastguard Worker        '''
371*cda5da8dSAndroid Build Coastguard Worker        Like `map()` method but the elements of the `iterable` are expected to
372*cda5da8dSAndroid Build Coastguard Worker        be iterables as well and will be unpacked as arguments. Hence
373*cda5da8dSAndroid Build Coastguard Worker        `func` and (a, b) becomes func(a, b).
374*cda5da8dSAndroid Build Coastguard Worker        '''
375*cda5da8dSAndroid Build Coastguard Worker        return self._map_async(func, iterable, starmapstar, chunksize).get()
376*cda5da8dSAndroid Build Coastguard Worker
377*cda5da8dSAndroid Build Coastguard Worker    def starmap_async(self, func, iterable, chunksize=None, callback=None,
378*cda5da8dSAndroid Build Coastguard Worker            error_callback=None):
379*cda5da8dSAndroid Build Coastguard Worker        '''
380*cda5da8dSAndroid Build Coastguard Worker        Asynchronous version of `starmap()` method.
381*cda5da8dSAndroid Build Coastguard Worker        '''
382*cda5da8dSAndroid Build Coastguard Worker        return self._map_async(func, iterable, starmapstar, chunksize,
383*cda5da8dSAndroid Build Coastguard Worker                               callback, error_callback)
384*cda5da8dSAndroid Build Coastguard Worker
385*cda5da8dSAndroid Build Coastguard Worker    def _guarded_task_generation(self, result_job, func, iterable):
386*cda5da8dSAndroid Build Coastguard Worker        '''Provides a generator of tasks for imap and imap_unordered with
387*cda5da8dSAndroid Build Coastguard Worker        appropriate handling for iterables which throw exceptions during
388*cda5da8dSAndroid Build Coastguard Worker        iteration.'''
389*cda5da8dSAndroid Build Coastguard Worker        try:
390*cda5da8dSAndroid Build Coastguard Worker            i = -1
391*cda5da8dSAndroid Build Coastguard Worker            for i, x in enumerate(iterable):
392*cda5da8dSAndroid Build Coastguard Worker                yield (result_job, i, func, (x,), {})
393*cda5da8dSAndroid Build Coastguard Worker        except Exception as e:
394*cda5da8dSAndroid Build Coastguard Worker            yield (result_job, i+1, _helper_reraises_exception, (e,), {})
395*cda5da8dSAndroid Build Coastguard Worker
396*cda5da8dSAndroid Build Coastguard Worker    def imap(self, func, iterable, chunksize=1):
397*cda5da8dSAndroid Build Coastguard Worker        '''
398*cda5da8dSAndroid Build Coastguard Worker        Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
399*cda5da8dSAndroid Build Coastguard Worker        '''
400*cda5da8dSAndroid Build Coastguard Worker        self._check_running()
401*cda5da8dSAndroid Build Coastguard Worker        if chunksize == 1:
402*cda5da8dSAndroid Build Coastguard Worker            result = IMapIterator(self)
403*cda5da8dSAndroid Build Coastguard Worker            self._taskqueue.put(
404*cda5da8dSAndroid Build Coastguard Worker                (
405*cda5da8dSAndroid Build Coastguard Worker                    self._guarded_task_generation(result._job, func, iterable),
406*cda5da8dSAndroid Build Coastguard Worker                    result._set_length
407*cda5da8dSAndroid Build Coastguard Worker                ))
408*cda5da8dSAndroid Build Coastguard Worker            return result
409*cda5da8dSAndroid Build Coastguard Worker        else:
410*cda5da8dSAndroid Build Coastguard Worker            if chunksize < 1:
411*cda5da8dSAndroid Build Coastguard Worker                raise ValueError(
412*cda5da8dSAndroid Build Coastguard Worker                    "Chunksize must be 1+, not {0:n}".format(
413*cda5da8dSAndroid Build Coastguard Worker                        chunksize))
414*cda5da8dSAndroid Build Coastguard Worker            task_batches = Pool._get_tasks(func, iterable, chunksize)
415*cda5da8dSAndroid Build Coastguard Worker            result = IMapIterator(self)
416*cda5da8dSAndroid Build Coastguard Worker            self._taskqueue.put(
417*cda5da8dSAndroid Build Coastguard Worker                (
418*cda5da8dSAndroid Build Coastguard Worker                    self._guarded_task_generation(result._job,
419*cda5da8dSAndroid Build Coastguard Worker                                                  mapstar,
420*cda5da8dSAndroid Build Coastguard Worker                                                  task_batches),
421*cda5da8dSAndroid Build Coastguard Worker                    result._set_length
422*cda5da8dSAndroid Build Coastguard Worker                ))
423*cda5da8dSAndroid Build Coastguard Worker            return (item for chunk in result for item in chunk)
424*cda5da8dSAndroid Build Coastguard Worker
425*cda5da8dSAndroid Build Coastguard Worker    def imap_unordered(self, func, iterable, chunksize=1):
426*cda5da8dSAndroid Build Coastguard Worker        '''
427*cda5da8dSAndroid Build Coastguard Worker        Like `imap()` method but ordering of results is arbitrary.
428*cda5da8dSAndroid Build Coastguard Worker        '''
429*cda5da8dSAndroid Build Coastguard Worker        self._check_running()
430*cda5da8dSAndroid Build Coastguard Worker        if chunksize == 1:
431*cda5da8dSAndroid Build Coastguard Worker            result = IMapUnorderedIterator(self)
432*cda5da8dSAndroid Build Coastguard Worker            self._taskqueue.put(
433*cda5da8dSAndroid Build Coastguard Worker                (
434*cda5da8dSAndroid Build Coastguard Worker                    self._guarded_task_generation(result._job, func, iterable),
435*cda5da8dSAndroid Build Coastguard Worker                    result._set_length
436*cda5da8dSAndroid Build Coastguard Worker                ))
437*cda5da8dSAndroid Build Coastguard Worker            return result
438*cda5da8dSAndroid Build Coastguard Worker        else:
439*cda5da8dSAndroid Build Coastguard Worker            if chunksize < 1:
440*cda5da8dSAndroid Build Coastguard Worker                raise ValueError(
441*cda5da8dSAndroid Build Coastguard Worker                    "Chunksize must be 1+, not {0!r}".format(chunksize))
442*cda5da8dSAndroid Build Coastguard Worker            task_batches = Pool._get_tasks(func, iterable, chunksize)
443*cda5da8dSAndroid Build Coastguard Worker            result = IMapUnorderedIterator(self)
444*cda5da8dSAndroid Build Coastguard Worker            self._taskqueue.put(
445*cda5da8dSAndroid Build Coastguard Worker                (
446*cda5da8dSAndroid Build Coastguard Worker                    self._guarded_task_generation(result._job,
447*cda5da8dSAndroid Build Coastguard Worker                                                  mapstar,
448*cda5da8dSAndroid Build Coastguard Worker                                                  task_batches),
449*cda5da8dSAndroid Build Coastguard Worker                    result._set_length
450*cda5da8dSAndroid Build Coastguard Worker                ))
451*cda5da8dSAndroid Build Coastguard Worker            return (item for chunk in result for item in chunk)
452*cda5da8dSAndroid Build Coastguard Worker
453*cda5da8dSAndroid Build Coastguard Worker    def apply_async(self, func, args=(), kwds={}, callback=None,
454*cda5da8dSAndroid Build Coastguard Worker            error_callback=None):
455*cda5da8dSAndroid Build Coastguard Worker        '''
456*cda5da8dSAndroid Build Coastguard Worker        Asynchronous version of `apply()` method.
457*cda5da8dSAndroid Build Coastguard Worker        '''
458*cda5da8dSAndroid Build Coastguard Worker        self._check_running()
459*cda5da8dSAndroid Build Coastguard Worker        result = ApplyResult(self, callback, error_callback)
460*cda5da8dSAndroid Build Coastguard Worker        self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
461*cda5da8dSAndroid Build Coastguard Worker        return result
462*cda5da8dSAndroid Build Coastguard Worker
463*cda5da8dSAndroid Build Coastguard Worker    def map_async(self, func, iterable, chunksize=None, callback=None,
464*cda5da8dSAndroid Build Coastguard Worker            error_callback=None):
465*cda5da8dSAndroid Build Coastguard Worker        '''
466*cda5da8dSAndroid Build Coastguard Worker        Asynchronous version of `map()` method.
467*cda5da8dSAndroid Build Coastguard Worker        '''
468*cda5da8dSAndroid Build Coastguard Worker        return self._map_async(func, iterable, mapstar, chunksize, callback,
469*cda5da8dSAndroid Build Coastguard Worker            error_callback)
470*cda5da8dSAndroid Build Coastguard Worker
471*cda5da8dSAndroid Build Coastguard Worker    def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
472*cda5da8dSAndroid Build Coastguard Worker            error_callback=None):
473*cda5da8dSAndroid Build Coastguard Worker        '''
474*cda5da8dSAndroid Build Coastguard Worker        Helper function to implement map, starmap and their async counterparts.
475*cda5da8dSAndroid Build Coastguard Worker        '''
476*cda5da8dSAndroid Build Coastguard Worker        self._check_running()
477*cda5da8dSAndroid Build Coastguard Worker        if not hasattr(iterable, '__len__'):
478*cda5da8dSAndroid Build Coastguard Worker            iterable = list(iterable)
479*cda5da8dSAndroid Build Coastguard Worker
480*cda5da8dSAndroid Build Coastguard Worker        if chunksize is None:
481*cda5da8dSAndroid Build Coastguard Worker            chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
482*cda5da8dSAndroid Build Coastguard Worker            if extra:
483*cda5da8dSAndroid Build Coastguard Worker                chunksize += 1
484*cda5da8dSAndroid Build Coastguard Worker        if len(iterable) == 0:
485*cda5da8dSAndroid Build Coastguard Worker            chunksize = 0
486*cda5da8dSAndroid Build Coastguard Worker
487*cda5da8dSAndroid Build Coastguard Worker        task_batches = Pool._get_tasks(func, iterable, chunksize)
488*cda5da8dSAndroid Build Coastguard Worker        result = MapResult(self, chunksize, len(iterable), callback,
489*cda5da8dSAndroid Build Coastguard Worker                           error_callback=error_callback)
490*cda5da8dSAndroid Build Coastguard Worker        self._taskqueue.put(
491*cda5da8dSAndroid Build Coastguard Worker            (
492*cda5da8dSAndroid Build Coastguard Worker                self._guarded_task_generation(result._job,
493*cda5da8dSAndroid Build Coastguard Worker                                              mapper,
494*cda5da8dSAndroid Build Coastguard Worker                                              task_batches),
495*cda5da8dSAndroid Build Coastguard Worker                None
496*cda5da8dSAndroid Build Coastguard Worker            )
497*cda5da8dSAndroid Build Coastguard Worker        )
498*cda5da8dSAndroid Build Coastguard Worker        return result
499*cda5da8dSAndroid Build Coastguard Worker
500*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
501*cda5da8dSAndroid Build Coastguard Worker    def _wait_for_updates(sentinels, change_notifier, timeout=None):
502*cda5da8dSAndroid Build Coastguard Worker        wait(sentinels, timeout=timeout)
503*cda5da8dSAndroid Build Coastguard Worker        while not change_notifier.empty():
504*cda5da8dSAndroid Build Coastguard Worker            change_notifier.get()
505*cda5da8dSAndroid Build Coastguard Worker
506*cda5da8dSAndroid Build Coastguard Worker    @classmethod
507*cda5da8dSAndroid Build Coastguard Worker    def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
508*cda5da8dSAndroid Build Coastguard Worker                        pool, inqueue, outqueue, initializer, initargs,
509*cda5da8dSAndroid Build Coastguard Worker                        maxtasksperchild, wrap_exception, sentinels,
510*cda5da8dSAndroid Build Coastguard Worker                        change_notifier):
511*cda5da8dSAndroid Build Coastguard Worker        thread = threading.current_thread()
512*cda5da8dSAndroid Build Coastguard Worker
513*cda5da8dSAndroid Build Coastguard Worker        # Keep maintaining workers until the cache gets drained, unless the pool
514*cda5da8dSAndroid Build Coastguard Worker        # is terminated.
515*cda5da8dSAndroid Build Coastguard Worker        while thread._state == RUN or (cache and thread._state != TERMINATE):
516*cda5da8dSAndroid Build Coastguard Worker            cls._maintain_pool(ctx, Process, processes, pool, inqueue,
517*cda5da8dSAndroid Build Coastguard Worker                               outqueue, initializer, initargs,
518*cda5da8dSAndroid Build Coastguard Worker                               maxtasksperchild, wrap_exception)
519*cda5da8dSAndroid Build Coastguard Worker
520*cda5da8dSAndroid Build Coastguard Worker            current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
521*cda5da8dSAndroid Build Coastguard Worker
522*cda5da8dSAndroid Build Coastguard Worker            cls._wait_for_updates(current_sentinels, change_notifier)
523*cda5da8dSAndroid Build Coastguard Worker        # send sentinel to stop workers
524*cda5da8dSAndroid Build Coastguard Worker        taskqueue.put(None)
525*cda5da8dSAndroid Build Coastguard Worker        util.debug('worker handler exiting')
526*cda5da8dSAndroid Build Coastguard Worker
527*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
528*cda5da8dSAndroid Build Coastguard Worker    def _handle_tasks(taskqueue, put, outqueue, pool, cache):
529*cda5da8dSAndroid Build Coastguard Worker        thread = threading.current_thread()
530*cda5da8dSAndroid Build Coastguard Worker
531*cda5da8dSAndroid Build Coastguard Worker        for taskseq, set_length in iter(taskqueue.get, None):
532*cda5da8dSAndroid Build Coastguard Worker            task = None
533*cda5da8dSAndroid Build Coastguard Worker            try:
534*cda5da8dSAndroid Build Coastguard Worker                # iterating taskseq cannot fail
535*cda5da8dSAndroid Build Coastguard Worker                for task in taskseq:
536*cda5da8dSAndroid Build Coastguard Worker                    if thread._state != RUN:
537*cda5da8dSAndroid Build Coastguard Worker                        util.debug('task handler found thread._state != RUN')
538*cda5da8dSAndroid Build Coastguard Worker                        break
539*cda5da8dSAndroid Build Coastguard Worker                    try:
540*cda5da8dSAndroid Build Coastguard Worker                        put(task)
541*cda5da8dSAndroid Build Coastguard Worker                    except Exception as e:
542*cda5da8dSAndroid Build Coastguard Worker                        job, idx = task[:2]
543*cda5da8dSAndroid Build Coastguard Worker                        try:
544*cda5da8dSAndroid Build Coastguard Worker                            cache[job]._set(idx, (False, e))
545*cda5da8dSAndroid Build Coastguard Worker                        except KeyError:
546*cda5da8dSAndroid Build Coastguard Worker                            pass
547*cda5da8dSAndroid Build Coastguard Worker                else:
548*cda5da8dSAndroid Build Coastguard Worker                    if set_length:
549*cda5da8dSAndroid Build Coastguard Worker                        util.debug('doing set_length()')
550*cda5da8dSAndroid Build Coastguard Worker                        idx = task[1] if task else -1
551*cda5da8dSAndroid Build Coastguard Worker                        set_length(idx + 1)
552*cda5da8dSAndroid Build Coastguard Worker                    continue
553*cda5da8dSAndroid Build Coastguard Worker                break
554*cda5da8dSAndroid Build Coastguard Worker            finally:
555*cda5da8dSAndroid Build Coastguard Worker                task = taskseq = job = None
556*cda5da8dSAndroid Build Coastguard Worker        else:
557*cda5da8dSAndroid Build Coastguard Worker            util.debug('task handler got sentinel')
558*cda5da8dSAndroid Build Coastguard Worker
559*cda5da8dSAndroid Build Coastguard Worker        try:
560*cda5da8dSAndroid Build Coastguard Worker            # tell result handler to finish when cache is empty
561*cda5da8dSAndroid Build Coastguard Worker            util.debug('task handler sending sentinel to result handler')
562*cda5da8dSAndroid Build Coastguard Worker            outqueue.put(None)
563*cda5da8dSAndroid Build Coastguard Worker
564*cda5da8dSAndroid Build Coastguard Worker            # tell workers there is no more work
565*cda5da8dSAndroid Build Coastguard Worker            util.debug('task handler sending sentinel to workers')
566*cda5da8dSAndroid Build Coastguard Worker            for p in pool:
567*cda5da8dSAndroid Build Coastguard Worker                put(None)
568*cda5da8dSAndroid Build Coastguard Worker        except OSError:
569*cda5da8dSAndroid Build Coastguard Worker            util.debug('task handler got OSError when sending sentinels')
570*cda5da8dSAndroid Build Coastguard Worker
571*cda5da8dSAndroid Build Coastguard Worker        util.debug('task handler exiting')
572*cda5da8dSAndroid Build Coastguard Worker
573*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
574*cda5da8dSAndroid Build Coastguard Worker    def _handle_results(outqueue, get, cache):
575*cda5da8dSAndroid Build Coastguard Worker        thread = threading.current_thread()
576*cda5da8dSAndroid Build Coastguard Worker
577*cda5da8dSAndroid Build Coastguard Worker        while 1:
578*cda5da8dSAndroid Build Coastguard Worker            try:
579*cda5da8dSAndroid Build Coastguard Worker                task = get()
580*cda5da8dSAndroid Build Coastguard Worker            except (OSError, EOFError):
581*cda5da8dSAndroid Build Coastguard Worker                util.debug('result handler got EOFError/OSError -- exiting')
582*cda5da8dSAndroid Build Coastguard Worker                return
583*cda5da8dSAndroid Build Coastguard Worker
584*cda5da8dSAndroid Build Coastguard Worker            if thread._state != RUN:
585*cda5da8dSAndroid Build Coastguard Worker                assert thread._state == TERMINATE, "Thread not in TERMINATE"
586*cda5da8dSAndroid Build Coastguard Worker                util.debug('result handler found thread._state=TERMINATE')
587*cda5da8dSAndroid Build Coastguard Worker                break
588*cda5da8dSAndroid Build Coastguard Worker
589*cda5da8dSAndroid Build Coastguard Worker            if task is None:
590*cda5da8dSAndroid Build Coastguard Worker                util.debug('result handler got sentinel')
591*cda5da8dSAndroid Build Coastguard Worker                break
592*cda5da8dSAndroid Build Coastguard Worker
593*cda5da8dSAndroid Build Coastguard Worker            job, i, obj = task
594*cda5da8dSAndroid Build Coastguard Worker            try:
595*cda5da8dSAndroid Build Coastguard Worker                cache[job]._set(i, obj)
596*cda5da8dSAndroid Build Coastguard Worker            except KeyError:
597*cda5da8dSAndroid Build Coastguard Worker                pass
598*cda5da8dSAndroid Build Coastguard Worker            task = job = obj = None
599*cda5da8dSAndroid Build Coastguard Worker
600*cda5da8dSAndroid Build Coastguard Worker        while cache and thread._state != TERMINATE:
601*cda5da8dSAndroid Build Coastguard Worker            try:
602*cda5da8dSAndroid Build Coastguard Worker                task = get()
603*cda5da8dSAndroid Build Coastguard Worker            except (OSError, EOFError):
604*cda5da8dSAndroid Build Coastguard Worker                util.debug('result handler got EOFError/OSError -- exiting')
605*cda5da8dSAndroid Build Coastguard Worker                return
606*cda5da8dSAndroid Build Coastguard Worker
607*cda5da8dSAndroid Build Coastguard Worker            if task is None:
608*cda5da8dSAndroid Build Coastguard Worker                util.debug('result handler ignoring extra sentinel')
609*cda5da8dSAndroid Build Coastguard Worker                continue
610*cda5da8dSAndroid Build Coastguard Worker            job, i, obj = task
611*cda5da8dSAndroid Build Coastguard Worker            try:
612*cda5da8dSAndroid Build Coastguard Worker                cache[job]._set(i, obj)
613*cda5da8dSAndroid Build Coastguard Worker            except KeyError:
614*cda5da8dSAndroid Build Coastguard Worker                pass
615*cda5da8dSAndroid Build Coastguard Worker            task = job = obj = None
616*cda5da8dSAndroid Build Coastguard Worker
617*cda5da8dSAndroid Build Coastguard Worker        if hasattr(outqueue, '_reader'):
618*cda5da8dSAndroid Build Coastguard Worker            util.debug('ensuring that outqueue is not full')
619*cda5da8dSAndroid Build Coastguard Worker            # If we don't make room available in outqueue then
620*cda5da8dSAndroid Build Coastguard Worker            # attempts to add the sentinel (None) to outqueue may
621*cda5da8dSAndroid Build Coastguard Worker            # block.  There is guaranteed to be no more than 2 sentinels.
622*cda5da8dSAndroid Build Coastguard Worker            try:
623*cda5da8dSAndroid Build Coastguard Worker                for i in range(10):
624*cda5da8dSAndroid Build Coastguard Worker                    if not outqueue._reader.poll():
625*cda5da8dSAndroid Build Coastguard Worker                        break
626*cda5da8dSAndroid Build Coastguard Worker                    get()
627*cda5da8dSAndroid Build Coastguard Worker            except (OSError, EOFError):
628*cda5da8dSAndroid Build Coastguard Worker                pass
629*cda5da8dSAndroid Build Coastguard Worker
630*cda5da8dSAndroid Build Coastguard Worker        util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
631*cda5da8dSAndroid Build Coastguard Worker              len(cache), thread._state)
632*cda5da8dSAndroid Build Coastguard Worker
633*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
634*cda5da8dSAndroid Build Coastguard Worker    def _get_tasks(func, it, size):
635*cda5da8dSAndroid Build Coastguard Worker        it = iter(it)
636*cda5da8dSAndroid Build Coastguard Worker        while 1:
637*cda5da8dSAndroid Build Coastguard Worker            x = tuple(itertools.islice(it, size))
638*cda5da8dSAndroid Build Coastguard Worker            if not x:
639*cda5da8dSAndroid Build Coastguard Worker                return
640*cda5da8dSAndroid Build Coastguard Worker            yield (func, x)
641*cda5da8dSAndroid Build Coastguard Worker
642*cda5da8dSAndroid Build Coastguard Worker    def __reduce__(self):
643*cda5da8dSAndroid Build Coastguard Worker        raise NotImplementedError(
644*cda5da8dSAndroid Build Coastguard Worker              'pool objects cannot be passed between processes or pickled'
645*cda5da8dSAndroid Build Coastguard Worker              )
646*cda5da8dSAndroid Build Coastguard Worker
647*cda5da8dSAndroid Build Coastguard Worker    def close(self):
648*cda5da8dSAndroid Build Coastguard Worker        util.debug('closing pool')
649*cda5da8dSAndroid Build Coastguard Worker        if self._state == RUN:
650*cda5da8dSAndroid Build Coastguard Worker            self._state = CLOSE
651*cda5da8dSAndroid Build Coastguard Worker            self._worker_handler._state = CLOSE
652*cda5da8dSAndroid Build Coastguard Worker            self._change_notifier.put(None)
653*cda5da8dSAndroid Build Coastguard Worker
654*cda5da8dSAndroid Build Coastguard Worker    def terminate(self):
655*cda5da8dSAndroid Build Coastguard Worker        util.debug('terminating pool')
656*cda5da8dSAndroid Build Coastguard Worker        self._state = TERMINATE
657*cda5da8dSAndroid Build Coastguard Worker        self._terminate()
658*cda5da8dSAndroid Build Coastguard Worker
659*cda5da8dSAndroid Build Coastguard Worker    def join(self):
660*cda5da8dSAndroid Build Coastguard Worker        util.debug('joining pool')
661*cda5da8dSAndroid Build Coastguard Worker        if self._state == RUN:
662*cda5da8dSAndroid Build Coastguard Worker            raise ValueError("Pool is still running")
663*cda5da8dSAndroid Build Coastguard Worker        elif self._state not in (CLOSE, TERMINATE):
664*cda5da8dSAndroid Build Coastguard Worker            raise ValueError("In unknown state")
665*cda5da8dSAndroid Build Coastguard Worker        self._worker_handler.join()
666*cda5da8dSAndroid Build Coastguard Worker        self._task_handler.join()
667*cda5da8dSAndroid Build Coastguard Worker        self._result_handler.join()
668*cda5da8dSAndroid Build Coastguard Worker        for p in self._pool:
669*cda5da8dSAndroid Build Coastguard Worker            p.join()
670*cda5da8dSAndroid Build Coastguard Worker
671*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
672*cda5da8dSAndroid Build Coastguard Worker    def _help_stuff_finish(inqueue, task_handler, size):
673*cda5da8dSAndroid Build Coastguard Worker        # task_handler may be blocked trying to put items on inqueue
674*cda5da8dSAndroid Build Coastguard Worker        util.debug('removing tasks from inqueue until task handler finished')
675*cda5da8dSAndroid Build Coastguard Worker        inqueue._rlock.acquire()
676*cda5da8dSAndroid Build Coastguard Worker        while task_handler.is_alive() and inqueue._reader.poll():
677*cda5da8dSAndroid Build Coastguard Worker            inqueue._reader.recv()
678*cda5da8dSAndroid Build Coastguard Worker            time.sleep(0)
679*cda5da8dSAndroid Build Coastguard Worker
680*cda5da8dSAndroid Build Coastguard Worker    @classmethod
681*cda5da8dSAndroid Build Coastguard Worker    def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
682*cda5da8dSAndroid Build Coastguard Worker                        worker_handler, task_handler, result_handler, cache):
683*cda5da8dSAndroid Build Coastguard Worker        # this is guaranteed to only be called once
684*cda5da8dSAndroid Build Coastguard Worker        util.debug('finalizing pool')
685*cda5da8dSAndroid Build Coastguard Worker
686*cda5da8dSAndroid Build Coastguard Worker        # Notify that the worker_handler state has been changed so the
687*cda5da8dSAndroid Build Coastguard Worker        # _handle_workers loop can be unblocked (and exited) in order to
688*cda5da8dSAndroid Build Coastguard Worker        # send the finalization sentinel all the workers.
689*cda5da8dSAndroid Build Coastguard Worker        worker_handler._state = TERMINATE
690*cda5da8dSAndroid Build Coastguard Worker        change_notifier.put(None)
691*cda5da8dSAndroid Build Coastguard Worker
692*cda5da8dSAndroid Build Coastguard Worker        task_handler._state = TERMINATE
693*cda5da8dSAndroid Build Coastguard Worker
694*cda5da8dSAndroid Build Coastguard Worker        util.debug('helping task handler/workers to finish')
695*cda5da8dSAndroid Build Coastguard Worker        cls._help_stuff_finish(inqueue, task_handler, len(pool))
696*cda5da8dSAndroid Build Coastguard Worker
697*cda5da8dSAndroid Build Coastguard Worker        if (not result_handler.is_alive()) and (len(cache) != 0):
698*cda5da8dSAndroid Build Coastguard Worker            raise AssertionError(
699*cda5da8dSAndroid Build Coastguard Worker                "Cannot have cache with result_hander not alive")
700*cda5da8dSAndroid Build Coastguard Worker
701*cda5da8dSAndroid Build Coastguard Worker        result_handler._state = TERMINATE
702*cda5da8dSAndroid Build Coastguard Worker        change_notifier.put(None)
703*cda5da8dSAndroid Build Coastguard Worker        outqueue.put(None)                  # sentinel
704*cda5da8dSAndroid Build Coastguard Worker
705*cda5da8dSAndroid Build Coastguard Worker        # We must wait for the worker handler to exit before terminating
706*cda5da8dSAndroid Build Coastguard Worker        # workers because we don't want workers to be restarted behind our back.
707*cda5da8dSAndroid Build Coastguard Worker        util.debug('joining worker handler')
708*cda5da8dSAndroid Build Coastguard Worker        if threading.current_thread() is not worker_handler:
709*cda5da8dSAndroid Build Coastguard Worker            worker_handler.join()
710*cda5da8dSAndroid Build Coastguard Worker
711*cda5da8dSAndroid Build Coastguard Worker        # Terminate workers which haven't already finished.
712*cda5da8dSAndroid Build Coastguard Worker        if pool and hasattr(pool[0], 'terminate'):
713*cda5da8dSAndroid Build Coastguard Worker            util.debug('terminating workers')
714*cda5da8dSAndroid Build Coastguard Worker            for p in pool:
715*cda5da8dSAndroid Build Coastguard Worker                if p.exitcode is None:
716*cda5da8dSAndroid Build Coastguard Worker                    p.terminate()
717*cda5da8dSAndroid Build Coastguard Worker
718*cda5da8dSAndroid Build Coastguard Worker        util.debug('joining task handler')
719*cda5da8dSAndroid Build Coastguard Worker        if threading.current_thread() is not task_handler:
720*cda5da8dSAndroid Build Coastguard Worker            task_handler.join()
721*cda5da8dSAndroid Build Coastguard Worker
722*cda5da8dSAndroid Build Coastguard Worker        util.debug('joining result handler')
723*cda5da8dSAndroid Build Coastguard Worker        if threading.current_thread() is not result_handler:
724*cda5da8dSAndroid Build Coastguard Worker            result_handler.join()
725*cda5da8dSAndroid Build Coastguard Worker
726*cda5da8dSAndroid Build Coastguard Worker        if pool and hasattr(pool[0], 'terminate'):
727*cda5da8dSAndroid Build Coastguard Worker            util.debug('joining pool workers')
728*cda5da8dSAndroid Build Coastguard Worker            for p in pool:
729*cda5da8dSAndroid Build Coastguard Worker                if p.is_alive():
730*cda5da8dSAndroid Build Coastguard Worker                    # worker has not yet exited
731*cda5da8dSAndroid Build Coastguard Worker                    util.debug('cleaning up worker %d' % p.pid)
732*cda5da8dSAndroid Build Coastguard Worker                    p.join()
733*cda5da8dSAndroid Build Coastguard Worker
734*cda5da8dSAndroid Build Coastguard Worker    def __enter__(self):
735*cda5da8dSAndroid Build Coastguard Worker        self._check_running()
736*cda5da8dSAndroid Build Coastguard Worker        return self
737*cda5da8dSAndroid Build Coastguard Worker
738*cda5da8dSAndroid Build Coastguard Worker    def __exit__(self, exc_type, exc_val, exc_tb):
739*cda5da8dSAndroid Build Coastguard Worker        self.terminate()
740*cda5da8dSAndroid Build Coastguard Worker
741*cda5da8dSAndroid Build Coastguard Worker#
742*cda5da8dSAndroid Build Coastguard Worker# Class whose instances are returned by `Pool.apply_async()`
743*cda5da8dSAndroid Build Coastguard Worker#
744*cda5da8dSAndroid Build Coastguard Worker
745*cda5da8dSAndroid Build Coastguard Workerclass ApplyResult(object):
746*cda5da8dSAndroid Build Coastguard Worker
747*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, pool, callback, error_callback):
748*cda5da8dSAndroid Build Coastguard Worker        self._pool = pool
749*cda5da8dSAndroid Build Coastguard Worker        self._event = threading.Event()
750*cda5da8dSAndroid Build Coastguard Worker        self._job = next(job_counter)
751*cda5da8dSAndroid Build Coastguard Worker        self._cache = pool._cache
752*cda5da8dSAndroid Build Coastguard Worker        self._callback = callback
753*cda5da8dSAndroid Build Coastguard Worker        self._error_callback = error_callback
754*cda5da8dSAndroid Build Coastguard Worker        self._cache[self._job] = self
755*cda5da8dSAndroid Build Coastguard Worker
756*cda5da8dSAndroid Build Coastguard Worker    def ready(self):
757*cda5da8dSAndroid Build Coastguard Worker        return self._event.is_set()
758*cda5da8dSAndroid Build Coastguard Worker
759*cda5da8dSAndroid Build Coastguard Worker    def successful(self):
760*cda5da8dSAndroid Build Coastguard Worker        if not self.ready():
761*cda5da8dSAndroid Build Coastguard Worker            raise ValueError("{0!r} not ready".format(self))
762*cda5da8dSAndroid Build Coastguard Worker        return self._success
763*cda5da8dSAndroid Build Coastguard Worker
764*cda5da8dSAndroid Build Coastguard Worker    def wait(self, timeout=None):
765*cda5da8dSAndroid Build Coastguard Worker        self._event.wait(timeout)
766*cda5da8dSAndroid Build Coastguard Worker
767*cda5da8dSAndroid Build Coastguard Worker    def get(self, timeout=None):
768*cda5da8dSAndroid Build Coastguard Worker        self.wait(timeout)
769*cda5da8dSAndroid Build Coastguard Worker        if not self.ready():
770*cda5da8dSAndroid Build Coastguard Worker            raise TimeoutError
771*cda5da8dSAndroid Build Coastguard Worker        if self._success:
772*cda5da8dSAndroid Build Coastguard Worker            return self._value
773*cda5da8dSAndroid Build Coastguard Worker        else:
774*cda5da8dSAndroid Build Coastguard Worker            raise self._value
775*cda5da8dSAndroid Build Coastguard Worker
776*cda5da8dSAndroid Build Coastguard Worker    def _set(self, i, obj):
777*cda5da8dSAndroid Build Coastguard Worker        self._success, self._value = obj
778*cda5da8dSAndroid Build Coastguard Worker        if self._callback and self._success:
779*cda5da8dSAndroid Build Coastguard Worker            self._callback(self._value)
780*cda5da8dSAndroid Build Coastguard Worker        if self._error_callback and not self._success:
781*cda5da8dSAndroid Build Coastguard Worker            self._error_callback(self._value)
782*cda5da8dSAndroid Build Coastguard Worker        self._event.set()
783*cda5da8dSAndroid Build Coastguard Worker        del self._cache[self._job]
784*cda5da8dSAndroid Build Coastguard Worker        self._pool = None
785*cda5da8dSAndroid Build Coastguard Worker
786*cda5da8dSAndroid Build Coastguard Worker    __class_getitem__ = classmethod(types.GenericAlias)
787*cda5da8dSAndroid Build Coastguard Worker
788*cda5da8dSAndroid Build Coastguard WorkerAsyncResult = ApplyResult       # create alias -- see #17805
789*cda5da8dSAndroid Build Coastguard Worker
790*cda5da8dSAndroid Build Coastguard Worker#
791*cda5da8dSAndroid Build Coastguard Worker# Class whose instances are returned by `Pool.map_async()`
792*cda5da8dSAndroid Build Coastguard Worker#
793*cda5da8dSAndroid Build Coastguard Worker
794*cda5da8dSAndroid Build Coastguard Workerclass MapResult(ApplyResult):
795*cda5da8dSAndroid Build Coastguard Worker
796*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, pool, chunksize, length, callback, error_callback):
797*cda5da8dSAndroid Build Coastguard Worker        ApplyResult.__init__(self, pool, callback,
798*cda5da8dSAndroid Build Coastguard Worker                             error_callback=error_callback)
799*cda5da8dSAndroid Build Coastguard Worker        self._success = True
800*cda5da8dSAndroid Build Coastguard Worker        self._value = [None] * length
801*cda5da8dSAndroid Build Coastguard Worker        self._chunksize = chunksize
802*cda5da8dSAndroid Build Coastguard Worker        if chunksize <= 0:
803*cda5da8dSAndroid Build Coastguard Worker            self._number_left = 0
804*cda5da8dSAndroid Build Coastguard Worker            self._event.set()
805*cda5da8dSAndroid Build Coastguard Worker            del self._cache[self._job]
806*cda5da8dSAndroid Build Coastguard Worker        else:
807*cda5da8dSAndroid Build Coastguard Worker            self._number_left = length//chunksize + bool(length % chunksize)
808*cda5da8dSAndroid Build Coastguard Worker
809*cda5da8dSAndroid Build Coastguard Worker    def _set(self, i, success_result):
810*cda5da8dSAndroid Build Coastguard Worker        self._number_left -= 1
811*cda5da8dSAndroid Build Coastguard Worker        success, result = success_result
812*cda5da8dSAndroid Build Coastguard Worker        if success and self._success:
813*cda5da8dSAndroid Build Coastguard Worker            self._value[i*self._chunksize:(i+1)*self._chunksize] = result
814*cda5da8dSAndroid Build Coastguard Worker            if self._number_left == 0:
815*cda5da8dSAndroid Build Coastguard Worker                if self._callback:
816*cda5da8dSAndroid Build Coastguard Worker                    self._callback(self._value)
817*cda5da8dSAndroid Build Coastguard Worker                del self._cache[self._job]
818*cda5da8dSAndroid Build Coastguard Worker                self._event.set()
819*cda5da8dSAndroid Build Coastguard Worker                self._pool = None
820*cda5da8dSAndroid Build Coastguard Worker        else:
821*cda5da8dSAndroid Build Coastguard Worker            if not success and self._success:
822*cda5da8dSAndroid Build Coastguard Worker                # only store first exception
823*cda5da8dSAndroid Build Coastguard Worker                self._success = False
824*cda5da8dSAndroid Build Coastguard Worker                self._value = result
825*cda5da8dSAndroid Build Coastguard Worker            if self._number_left == 0:
826*cda5da8dSAndroid Build Coastguard Worker                # only consider the result ready once all jobs are done
827*cda5da8dSAndroid Build Coastguard Worker                if self._error_callback:
828*cda5da8dSAndroid Build Coastguard Worker                    self._error_callback(self._value)
829*cda5da8dSAndroid Build Coastguard Worker                del self._cache[self._job]
830*cda5da8dSAndroid Build Coastguard Worker                self._event.set()
831*cda5da8dSAndroid Build Coastguard Worker                self._pool = None
832*cda5da8dSAndroid Build Coastguard Worker
833*cda5da8dSAndroid Build Coastguard Worker#
834*cda5da8dSAndroid Build Coastguard Worker# Class whose instances are returned by `Pool.imap()`
835*cda5da8dSAndroid Build Coastguard Worker#
836*cda5da8dSAndroid Build Coastguard Worker
837*cda5da8dSAndroid Build Coastguard Workerclass IMapIterator(object):
838*cda5da8dSAndroid Build Coastguard Worker
839*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, pool):
840*cda5da8dSAndroid Build Coastguard Worker        self._pool = pool
841*cda5da8dSAndroid Build Coastguard Worker        self._cond = threading.Condition(threading.Lock())
842*cda5da8dSAndroid Build Coastguard Worker        self._job = next(job_counter)
843*cda5da8dSAndroid Build Coastguard Worker        self._cache = pool._cache
844*cda5da8dSAndroid Build Coastguard Worker        self._items = collections.deque()
845*cda5da8dSAndroid Build Coastguard Worker        self._index = 0
846*cda5da8dSAndroid Build Coastguard Worker        self._length = None
847*cda5da8dSAndroid Build Coastguard Worker        self._unsorted = {}
848*cda5da8dSAndroid Build Coastguard Worker        self._cache[self._job] = self
849*cda5da8dSAndroid Build Coastguard Worker
850*cda5da8dSAndroid Build Coastguard Worker    def __iter__(self):
851*cda5da8dSAndroid Build Coastguard Worker        return self
852*cda5da8dSAndroid Build Coastguard Worker
853*cda5da8dSAndroid Build Coastguard Worker    def next(self, timeout=None):
854*cda5da8dSAndroid Build Coastguard Worker        with self._cond:
855*cda5da8dSAndroid Build Coastguard Worker            try:
856*cda5da8dSAndroid Build Coastguard Worker                item = self._items.popleft()
857*cda5da8dSAndroid Build Coastguard Worker            except IndexError:
858*cda5da8dSAndroid Build Coastguard Worker                if self._index == self._length:
859*cda5da8dSAndroid Build Coastguard Worker                    self._pool = None
860*cda5da8dSAndroid Build Coastguard Worker                    raise StopIteration from None
861*cda5da8dSAndroid Build Coastguard Worker                self._cond.wait(timeout)
862*cda5da8dSAndroid Build Coastguard Worker                try:
863*cda5da8dSAndroid Build Coastguard Worker                    item = self._items.popleft()
864*cda5da8dSAndroid Build Coastguard Worker                except IndexError:
865*cda5da8dSAndroid Build Coastguard Worker                    if self._index == self._length:
866*cda5da8dSAndroid Build Coastguard Worker                        self._pool = None
867*cda5da8dSAndroid Build Coastguard Worker                        raise StopIteration from None
868*cda5da8dSAndroid Build Coastguard Worker                    raise TimeoutError from None
869*cda5da8dSAndroid Build Coastguard Worker
870*cda5da8dSAndroid Build Coastguard Worker        success, value = item
871*cda5da8dSAndroid Build Coastguard Worker        if success:
872*cda5da8dSAndroid Build Coastguard Worker            return value
873*cda5da8dSAndroid Build Coastguard Worker        raise value
874*cda5da8dSAndroid Build Coastguard Worker
875*cda5da8dSAndroid Build Coastguard Worker    __next__ = next                    # XXX
876*cda5da8dSAndroid Build Coastguard Worker
877*cda5da8dSAndroid Build Coastguard Worker    def _set(self, i, obj):
878*cda5da8dSAndroid Build Coastguard Worker        with self._cond:
879*cda5da8dSAndroid Build Coastguard Worker            if self._index == i:
880*cda5da8dSAndroid Build Coastguard Worker                self._items.append(obj)
881*cda5da8dSAndroid Build Coastguard Worker                self._index += 1
882*cda5da8dSAndroid Build Coastguard Worker                while self._index in self._unsorted:
883*cda5da8dSAndroid Build Coastguard Worker                    obj = self._unsorted.pop(self._index)
884*cda5da8dSAndroid Build Coastguard Worker                    self._items.append(obj)
885*cda5da8dSAndroid Build Coastguard Worker                    self._index += 1
886*cda5da8dSAndroid Build Coastguard Worker                self._cond.notify()
887*cda5da8dSAndroid Build Coastguard Worker            else:
888*cda5da8dSAndroid Build Coastguard Worker                self._unsorted[i] = obj
889*cda5da8dSAndroid Build Coastguard Worker
890*cda5da8dSAndroid Build Coastguard Worker            if self._index == self._length:
891*cda5da8dSAndroid Build Coastguard Worker                del self._cache[self._job]
892*cda5da8dSAndroid Build Coastguard Worker                self._pool = None
893*cda5da8dSAndroid Build Coastguard Worker
894*cda5da8dSAndroid Build Coastguard Worker    def _set_length(self, length):
895*cda5da8dSAndroid Build Coastguard Worker        with self._cond:
896*cda5da8dSAndroid Build Coastguard Worker            self._length = length
897*cda5da8dSAndroid Build Coastguard Worker            if self._index == self._length:
898*cda5da8dSAndroid Build Coastguard Worker                self._cond.notify()
899*cda5da8dSAndroid Build Coastguard Worker                del self._cache[self._job]
900*cda5da8dSAndroid Build Coastguard Worker                self._pool = None
901*cda5da8dSAndroid Build Coastguard Worker
902*cda5da8dSAndroid Build Coastguard Worker#
903*cda5da8dSAndroid Build Coastguard Worker# Class whose instances are returned by `Pool.imap_unordered()`
904*cda5da8dSAndroid Build Coastguard Worker#
905*cda5da8dSAndroid Build Coastguard Worker
906*cda5da8dSAndroid Build Coastguard Workerclass IMapUnorderedIterator(IMapIterator):
907*cda5da8dSAndroid Build Coastguard Worker
908*cda5da8dSAndroid Build Coastguard Worker    def _set(self, i, obj):
909*cda5da8dSAndroid Build Coastguard Worker        with self._cond:
910*cda5da8dSAndroid Build Coastguard Worker            self._items.append(obj)
911*cda5da8dSAndroid Build Coastguard Worker            self._index += 1
912*cda5da8dSAndroid Build Coastguard Worker            self._cond.notify()
913*cda5da8dSAndroid Build Coastguard Worker            if self._index == self._length:
914*cda5da8dSAndroid Build Coastguard Worker                del self._cache[self._job]
915*cda5da8dSAndroid Build Coastguard Worker                self._pool = None
916*cda5da8dSAndroid Build Coastguard Worker
917*cda5da8dSAndroid Build Coastguard Worker#
918*cda5da8dSAndroid Build Coastguard Worker#
919*cda5da8dSAndroid Build Coastguard Worker#
920*cda5da8dSAndroid Build Coastguard Worker
921*cda5da8dSAndroid Build Coastguard Workerclass ThreadPool(Pool):
922*cda5da8dSAndroid Build Coastguard Worker    _wrap_exception = False
923*cda5da8dSAndroid Build Coastguard Worker
924*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
925*cda5da8dSAndroid Build Coastguard Worker    def Process(ctx, *args, **kwds):
926*cda5da8dSAndroid Build Coastguard Worker        from .dummy import Process
927*cda5da8dSAndroid Build Coastguard Worker        return Process(*args, **kwds)
928*cda5da8dSAndroid Build Coastguard Worker
929*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, processes=None, initializer=None, initargs=()):
930*cda5da8dSAndroid Build Coastguard Worker        Pool.__init__(self, processes, initializer, initargs)
931*cda5da8dSAndroid Build Coastguard Worker
932*cda5da8dSAndroid Build Coastguard Worker    def _setup_queues(self):
933*cda5da8dSAndroid Build Coastguard Worker        self._inqueue = queue.SimpleQueue()
934*cda5da8dSAndroid Build Coastguard Worker        self._outqueue = queue.SimpleQueue()
935*cda5da8dSAndroid Build Coastguard Worker        self._quick_put = self._inqueue.put
936*cda5da8dSAndroid Build Coastguard Worker        self._quick_get = self._outqueue.get
937*cda5da8dSAndroid Build Coastguard Worker
938*cda5da8dSAndroid Build Coastguard Worker    def _get_sentinels(self):
939*cda5da8dSAndroid Build Coastguard Worker        return [self._change_notifier._reader]
940*cda5da8dSAndroid Build Coastguard Worker
941*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
942*cda5da8dSAndroid Build Coastguard Worker    def _get_worker_sentinels(workers):
943*cda5da8dSAndroid Build Coastguard Worker        return []
944*cda5da8dSAndroid Build Coastguard Worker
945*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
946*cda5da8dSAndroid Build Coastguard Worker    def _help_stuff_finish(inqueue, task_handler, size):
947*cda5da8dSAndroid Build Coastguard Worker        # drain inqueue, and put sentinels at its head to make workers finish
948*cda5da8dSAndroid Build Coastguard Worker        try:
949*cda5da8dSAndroid Build Coastguard Worker            while True:
950*cda5da8dSAndroid Build Coastguard Worker                inqueue.get(block=False)
951*cda5da8dSAndroid Build Coastguard Worker        except queue.Empty:
952*cda5da8dSAndroid Build Coastguard Worker            pass
953*cda5da8dSAndroid Build Coastguard Worker        for i in range(size):
954*cda5da8dSAndroid Build Coastguard Worker            inqueue.put(None)
955*cda5da8dSAndroid Build Coastguard Worker
956*cda5da8dSAndroid Build Coastguard Worker    def _wait_for_updates(self, sentinels, change_notifier, timeout):
957*cda5da8dSAndroid Build Coastguard Worker        time.sleep(timeout)
958