xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/concurrent/futures/_base.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4__author__ = 'Brian Quinlan ([email protected])'
5
6import collections
7import logging
8import threading
9import time
10import types
11
12FIRST_COMPLETED = 'FIRST_COMPLETED'
13FIRST_EXCEPTION = 'FIRST_EXCEPTION'
14ALL_COMPLETED = 'ALL_COMPLETED'
15_AS_COMPLETED = '_AS_COMPLETED'
16
17# Possible future states (for internal use by the futures package).
18PENDING = 'PENDING'
19RUNNING = 'RUNNING'
20# The future was cancelled by the user...
21CANCELLED = 'CANCELLED'
22# ...and _Waiter.add_cancelled() was called by a worker.
23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
24FINISHED = 'FINISHED'
25
26_FUTURE_STATES = [
27    PENDING,
28    RUNNING,
29    CANCELLED,
30    CANCELLED_AND_NOTIFIED,
31    FINISHED
32]
33
34_STATE_TO_DESCRIPTION_MAP = {
35    PENDING: "pending",
36    RUNNING: "running",
37    CANCELLED: "cancelled",
38    CANCELLED_AND_NOTIFIED: "cancelled",
39    FINISHED: "finished"
40}
41
42# Logger for internal use by the futures package.
43LOGGER = logging.getLogger("concurrent.futures")
44
45class Error(Exception):
46    """Base class for all future-related exceptions."""
47    pass
48
49class CancelledError(Error):
50    """The Future was cancelled."""
51    pass
52
53TimeoutError = TimeoutError  # make local alias for the standard exception
54
55class InvalidStateError(Error):
56    """The operation is not allowed in this state."""
57    pass
58
59class _Waiter(object):
60    """Provides the event that wait() and as_completed() block on."""
61    def __init__(self):
62        self.event = threading.Event()
63        self.finished_futures = []
64
65    def add_result(self, future):
66        self.finished_futures.append(future)
67
68    def add_exception(self, future):
69        self.finished_futures.append(future)
70
71    def add_cancelled(self, future):
72        self.finished_futures.append(future)
73
74class _AsCompletedWaiter(_Waiter):
75    """Used by as_completed()."""
76
77    def __init__(self):
78        super(_AsCompletedWaiter, self).__init__()
79        self.lock = threading.Lock()
80
81    def add_result(self, future):
82        with self.lock:
83            super(_AsCompletedWaiter, self).add_result(future)
84            self.event.set()
85
86    def add_exception(self, future):
87        with self.lock:
88            super(_AsCompletedWaiter, self).add_exception(future)
89            self.event.set()
90
91    def add_cancelled(self, future):
92        with self.lock:
93            super(_AsCompletedWaiter, self).add_cancelled(future)
94            self.event.set()
95
96class _FirstCompletedWaiter(_Waiter):
97    """Used by wait(return_when=FIRST_COMPLETED)."""
98
99    def add_result(self, future):
100        super().add_result(future)
101        self.event.set()
102
103    def add_exception(self, future):
104        super().add_exception(future)
105        self.event.set()
106
107    def add_cancelled(self, future):
108        super().add_cancelled(future)
109        self.event.set()
110
111class _AllCompletedWaiter(_Waiter):
112    """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
113
114    def __init__(self, num_pending_calls, stop_on_exception):
115        self.num_pending_calls = num_pending_calls
116        self.stop_on_exception = stop_on_exception
117        self.lock = threading.Lock()
118        super().__init__()
119
120    def _decrement_pending_calls(self):
121        with self.lock:
122            self.num_pending_calls -= 1
123            if not self.num_pending_calls:
124                self.event.set()
125
126    def add_result(self, future):
127        super().add_result(future)
128        self._decrement_pending_calls()
129
130    def add_exception(self, future):
131        super().add_exception(future)
132        if self.stop_on_exception:
133            self.event.set()
134        else:
135            self._decrement_pending_calls()
136
137    def add_cancelled(self, future):
138        super().add_cancelled(future)
139        self._decrement_pending_calls()
140
141class _AcquireFutures(object):
142    """A context manager that does an ordered acquire of Future conditions."""
143
144    def __init__(self, futures):
145        self.futures = sorted(futures, key=id)
146
147    def __enter__(self):
148        for future in self.futures:
149            future._condition.acquire()
150
151    def __exit__(self, *args):
152        for future in self.futures:
153            future._condition.release()
154
155def _create_and_install_waiters(fs, return_when):
156    if return_when == _AS_COMPLETED:
157        waiter = _AsCompletedWaiter()
158    elif return_when == FIRST_COMPLETED:
159        waiter = _FirstCompletedWaiter()
160    else:
161        pending_count = sum(
162                f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
163
164        if return_when == FIRST_EXCEPTION:
165            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
166        elif return_when == ALL_COMPLETED:
167            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
168        else:
169            raise ValueError("Invalid return condition: %r" % return_when)
170
171    for f in fs:
172        f._waiters.append(waiter)
173
174    return waiter
175
176
177def _yield_finished_futures(fs, waiter, ref_collect):
178    """
179    Iterate on the list *fs*, yielding finished futures one by one in
180    reverse order.
181    Before yielding a future, *waiter* is removed from its waiters
182    and the future is removed from each set in the collection of sets
183    *ref_collect*.
184
185    The aim of this function is to avoid keeping stale references after
186    the future is yielded and before the iterator resumes.
187    """
188    while fs:
189        f = fs[-1]
190        for futures_set in ref_collect:
191            futures_set.remove(f)
192        with f._condition:
193            f._waiters.remove(waiter)
194        del f
195        # Careful not to keep a reference to the popped value
196        yield fs.pop()
197
198
199def as_completed(fs, timeout=None):
200    """An iterator over the given futures that yields each as it completes.
201
202    Args:
203        fs: The sequence of Futures (possibly created by different Executors) to
204            iterate over.
205        timeout: The maximum number of seconds to wait. If None, then there
206            is no limit on the wait time.
207
208    Returns:
209        An iterator that yields the given Futures as they complete (finished or
210        cancelled). If any given Futures are duplicated, they will be returned
211        once.
212
213    Raises:
214        TimeoutError: If the entire result iterator could not be generated
215            before the given timeout.
216    """
217    if timeout is not None:
218        end_time = timeout + time.monotonic()
219
220    fs = set(fs)
221    total_futures = len(fs)
222    with _AcquireFutures(fs):
223        finished = set(
224                f for f in fs
225                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
226        pending = fs - finished
227        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
228    finished = list(finished)
229    try:
230        yield from _yield_finished_futures(finished, waiter,
231                                           ref_collect=(fs,))
232
233        while pending:
234            if timeout is None:
235                wait_timeout = None
236            else:
237                wait_timeout = end_time - time.monotonic()
238                if wait_timeout < 0:
239                    raise TimeoutError(
240                            '%d (of %d) futures unfinished' % (
241                            len(pending), total_futures))
242
243            waiter.event.wait(wait_timeout)
244
245            with waiter.lock:
246                finished = waiter.finished_futures
247                waiter.finished_futures = []
248                waiter.event.clear()
249
250            # reverse to keep finishing order
251            finished.reverse()
252            yield from _yield_finished_futures(finished, waiter,
253                                               ref_collect=(fs, pending))
254
255    finally:
256        # Remove waiter from unfinished futures
257        for f in fs:
258            with f._condition:
259                f._waiters.remove(waiter)
260
261DoneAndNotDoneFutures = collections.namedtuple(
262        'DoneAndNotDoneFutures', 'done not_done')
263def wait(fs, timeout=None, return_when=ALL_COMPLETED):
264    """Wait for the futures in the given sequence to complete.
265
266    Args:
267        fs: The sequence of Futures (possibly created by different Executors) to
268            wait upon.
269        timeout: The maximum number of seconds to wait. If None, then there
270            is no limit on the wait time.
271        return_when: Indicates when this function should return. The options
272            are:
273
274            FIRST_COMPLETED - Return when any future finishes or is
275                              cancelled.
276            FIRST_EXCEPTION - Return when any future finishes by raising an
277                              exception. If no future raises an exception
278                              then it is equivalent to ALL_COMPLETED.
279            ALL_COMPLETED -   Return when all futures finish or are cancelled.
280
281    Returns:
282        A named 2-tuple of sets. The first set, named 'done', contains the
283        futures that completed (is finished or cancelled) before the wait
284        completed. The second set, named 'not_done', contains uncompleted
285        futures. Duplicate futures given to *fs* are removed and will be
286        returned only once.
287    """
288    fs = set(fs)
289    with _AcquireFutures(fs):
290        done = {f for f in fs
291                   if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]}
292        not_done = fs - done
293        if (return_when == FIRST_COMPLETED) and done:
294            return DoneAndNotDoneFutures(done, not_done)
295        elif (return_when == FIRST_EXCEPTION) and done:
296            if any(f for f in done
297                   if not f.cancelled() and f.exception() is not None):
298                return DoneAndNotDoneFutures(done, not_done)
299
300        if len(done) == len(fs):
301            return DoneAndNotDoneFutures(done, not_done)
302
303        waiter = _create_and_install_waiters(fs, return_when)
304
305    waiter.event.wait(timeout)
306    for f in fs:
307        with f._condition:
308            f._waiters.remove(waiter)
309
310    done.update(waiter.finished_futures)
311    return DoneAndNotDoneFutures(done, fs - done)
312
313
314def _result_or_cancel(fut, timeout=None):
315    try:
316        try:
317            return fut.result(timeout)
318        finally:
319            fut.cancel()
320    finally:
321        # Break a reference cycle with the exception in self._exception
322        del fut
323
324
325class Future(object):
326    """Represents the result of an asynchronous computation."""
327
328    def __init__(self):
329        """Initializes the future. Should not be called by clients."""
330        self._condition = threading.Condition()
331        self._state = PENDING
332        self._result = None
333        self._exception = None
334        self._waiters = []
335        self._done_callbacks = []
336
337    def _invoke_callbacks(self):
338        for callback in self._done_callbacks:
339            try:
340                callback(self)
341            except Exception:
342                LOGGER.exception('exception calling callback for %r', self)
343
344    def __repr__(self):
345        with self._condition:
346            if self._state == FINISHED:
347                if self._exception:
348                    return '<%s at %#x state=%s raised %s>' % (
349                        self.__class__.__name__,
350                        id(self),
351                        _STATE_TO_DESCRIPTION_MAP[self._state],
352                        self._exception.__class__.__name__)
353                else:
354                    return '<%s at %#x state=%s returned %s>' % (
355                        self.__class__.__name__,
356                        id(self),
357                        _STATE_TO_DESCRIPTION_MAP[self._state],
358                        self._result.__class__.__name__)
359            return '<%s at %#x state=%s>' % (
360                    self.__class__.__name__,
361                    id(self),
362                   _STATE_TO_DESCRIPTION_MAP[self._state])
363
364    def cancel(self):
365        """Cancel the future if possible.
366
367        Returns True if the future was cancelled, False otherwise. A future
368        cannot be cancelled if it is running or has already completed.
369        """
370        with self._condition:
371            if self._state in [RUNNING, FINISHED]:
372                return False
373
374            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
375                return True
376
377            self._state = CANCELLED
378            self._condition.notify_all()
379
380        self._invoke_callbacks()
381        return True
382
383    def cancelled(self):
384        """Return True if the future was cancelled."""
385        with self._condition:
386            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
387
388    def running(self):
389        """Return True if the future is currently executing."""
390        with self._condition:
391            return self._state == RUNNING
392
393    def done(self):
394        """Return True if the future was cancelled or finished executing."""
395        with self._condition:
396            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
397
398    def __get_result(self):
399        if self._exception:
400            try:
401                raise self._exception
402            finally:
403                # Break a reference cycle with the exception in self._exception
404                self = None
405        else:
406            return self._result
407
408    def add_done_callback(self, fn):
409        """Attaches a callable that will be called when the future finishes.
410
411        Args:
412            fn: A callable that will be called with this future as its only
413                argument when the future completes or is cancelled. The callable
414                will always be called by a thread in the same process in which
415                it was added. If the future has already completed or been
416                cancelled then the callable will be called immediately. These
417                callables are called in the order that they were added.
418        """
419        with self._condition:
420            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
421                self._done_callbacks.append(fn)
422                return
423        try:
424            fn(self)
425        except Exception:
426            LOGGER.exception('exception calling callback for %r', self)
427
428    def result(self, timeout=None):
429        """Return the result of the call that the future represents.
430
431        Args:
432            timeout: The number of seconds to wait for the result if the future
433                isn't done. If None, then there is no limit on the wait time.
434
435        Returns:
436            The result of the call that the future represents.
437
438        Raises:
439            CancelledError: If the future was cancelled.
440            TimeoutError: If the future didn't finish executing before the given
441                timeout.
442            Exception: If the call raised then that exception will be raised.
443        """
444        try:
445            with self._condition:
446                if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
447                    raise CancelledError()
448                elif self._state == FINISHED:
449                    return self.__get_result()
450
451                self._condition.wait(timeout)
452
453                if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
454                    raise CancelledError()
455                elif self._state == FINISHED:
456                    return self.__get_result()
457                else:
458                    raise TimeoutError()
459        finally:
460            # Break a reference cycle with the exception in self._exception
461            self = None
462
463    def exception(self, timeout=None):
464        """Return the exception raised by the call that the future represents.
465
466        Args:
467            timeout: The number of seconds to wait for the exception if the
468                future isn't done. If None, then there is no limit on the wait
469                time.
470
471        Returns:
472            The exception raised by the call that the future represents or None
473            if the call completed without raising.
474
475        Raises:
476            CancelledError: If the future was cancelled.
477            TimeoutError: If the future didn't finish executing before the given
478                timeout.
479        """
480
481        with self._condition:
482            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
483                raise CancelledError()
484            elif self._state == FINISHED:
485                return self._exception
486
487            self._condition.wait(timeout)
488
489            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
490                raise CancelledError()
491            elif self._state == FINISHED:
492                return self._exception
493            else:
494                raise TimeoutError()
495
496    # The following methods should only be used by Executors and in tests.
497    def set_running_or_notify_cancel(self):
498        """Mark the future as running or process any cancel notifications.
499
500        Should only be used by Executor implementations and unit tests.
501
502        If the future has been cancelled (cancel() was called and returned
503        True) then any threads waiting on the future completing (though calls
504        to as_completed() or wait()) are notified and False is returned.
505
506        If the future was not cancelled then it is put in the running state
507        (future calls to running() will return True) and True is returned.
508
509        This method should be called by Executor implementations before
510        executing the work associated with this future. If this method returns
511        False then the work should not be executed.
512
513        Returns:
514            False if the Future was cancelled, True otherwise.
515
516        Raises:
517            RuntimeError: if this method was already called or if set_result()
518                or set_exception() was called.
519        """
520        with self._condition:
521            if self._state == CANCELLED:
522                self._state = CANCELLED_AND_NOTIFIED
523                for waiter in self._waiters:
524                    waiter.add_cancelled(self)
525                # self._condition.notify_all() is not necessary because
526                # self.cancel() triggers a notification.
527                return False
528            elif self._state == PENDING:
529                self._state = RUNNING
530                return True
531            else:
532                LOGGER.critical('Future %s in unexpected state: %s',
533                                id(self),
534                                self._state)
535                raise RuntimeError('Future in unexpected state')
536
537    def set_result(self, result):
538        """Sets the return value of work associated with the future.
539
540        Should only be used by Executor implementations and unit tests.
541        """
542        with self._condition:
543            if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
544                raise InvalidStateError('{}: {!r}'.format(self._state, self))
545            self._result = result
546            self._state = FINISHED
547            for waiter in self._waiters:
548                waiter.add_result(self)
549            self._condition.notify_all()
550        self._invoke_callbacks()
551
552    def set_exception(self, exception):
553        """Sets the result of the future as being the given exception.
554
555        Should only be used by Executor implementations and unit tests.
556        """
557        with self._condition:
558            if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
559                raise InvalidStateError('{}: {!r}'.format(self._state, self))
560            self._exception = exception
561            self._state = FINISHED
562            for waiter in self._waiters:
563                waiter.add_exception(self)
564            self._condition.notify_all()
565        self._invoke_callbacks()
566
567    __class_getitem__ = classmethod(types.GenericAlias)
568
569class Executor(object):
570    """This is an abstract base class for concrete asynchronous executors."""
571
572    def submit(self, fn, /, *args, **kwargs):
573        """Submits a callable to be executed with the given arguments.
574
575        Schedules the callable to be executed as fn(*args, **kwargs) and returns
576        a Future instance representing the execution of the callable.
577
578        Returns:
579            A Future representing the given call.
580        """
581        raise NotImplementedError()
582
583    def map(self, fn, *iterables, timeout=None, chunksize=1):
584        """Returns an iterator equivalent to map(fn, iter).
585
586        Args:
587            fn: A callable that will take as many arguments as there are
588                passed iterables.
589            timeout: The maximum number of seconds to wait. If None, then there
590                is no limit on the wait time.
591            chunksize: The size of the chunks the iterable will be broken into
592                before being passed to a child process. This argument is only
593                used by ProcessPoolExecutor; it is ignored by
594                ThreadPoolExecutor.
595
596        Returns:
597            An iterator equivalent to: map(func, *iterables) but the calls may
598            be evaluated out-of-order.
599
600        Raises:
601            TimeoutError: If the entire result iterator could not be generated
602                before the given timeout.
603            Exception: If fn(*args) raises for any values.
604        """
605        if timeout is not None:
606            end_time = timeout + time.monotonic()
607
608        fs = [self.submit(fn, *args) for args in zip(*iterables)]
609
610        # Yield must be hidden in closure so that the futures are submitted
611        # before the first iterator value is required.
612        def result_iterator():
613            try:
614                # reverse to keep finishing order
615                fs.reverse()
616                while fs:
617                    # Careful not to keep a reference to the popped future
618                    if timeout is None:
619                        yield _result_or_cancel(fs.pop())
620                    else:
621                        yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
622            finally:
623                for future in fs:
624                    future.cancel()
625        return result_iterator()
626
627    def shutdown(self, wait=True, *, cancel_futures=False):
628        """Clean-up the resources associated with the Executor.
629
630        It is safe to call this method several times. Otherwise, no other
631        methods can be called after this one.
632
633        Args:
634            wait: If True then shutdown will not return until all running
635                futures have finished executing and the resources used by the
636                executor have been reclaimed.
637            cancel_futures: If True then shutdown will cancel all pending
638                futures. Futures that are completed or running will not be
639                cancelled.
640        """
641        pass
642
643    def __enter__(self):
644        return self
645
646    def __exit__(self, exc_type, exc_val, exc_tb):
647        self.shutdown(wait=True)
648        return False
649
650
651class BrokenExecutor(RuntimeError):
652    """
653    Raised when a executor has become non-functional after a severe failure.
654    """
655