1"""A Future class similar to the one in PEP 3148."""
2
3__all__ = (
4    'Future', 'wrap_future', 'isfuture',
5)
6
7import concurrent.futures
8import contextvars
9import logging
10import sys
11from types import GenericAlias
12
13from . import base_futures
14from . import events
15from . import exceptions
16from . import format_helpers
17
18
19isfuture = base_futures.isfuture
20
21
22_PENDING = base_futures._PENDING
23_CANCELLED = base_futures._CANCELLED
24_FINISHED = base_futures._FINISHED
25
26
27STACK_DEBUG = logging.DEBUG - 1  # heavy-duty debugging
28
29
30class Future:
31    """This class is *almost* compatible with concurrent.futures.Future.
32
33    Differences:
34
35    - This class is not thread-safe.
36
37    - result() and exception() do not take a timeout argument and
38      raise an exception when the future isn't done yet.
39
40    - Callbacks registered with add_done_callback() are always called
41      via the event loop's call_soon().
42
43    - This class is not compatible with the wait() and as_completed()
44      methods in the concurrent.futures package.
45
46    (In Python 3.4 or later we may be able to unify the implementations.)
47    """
48
49    # Class variables serving as defaults for instance variables.
50    _state = _PENDING
51    _result = None
52    _exception = None
53    _loop = None
54    _source_traceback = None
55    _cancel_message = None
56    # A saved CancelledError for later chaining as an exception context.
57    _cancelled_exc = None
58
59    # This field is used for a dual purpose:
60    # - Its presence is a marker to declare that a class implements
61    #   the Future protocol (i.e. is intended to be duck-type compatible).
62    #   The value must also be not-None, to enable a subclass to declare
63    #   that it is not compatible by setting this to None.
64    # - It is set by __iter__() below so that Task._step() can tell
65    #   the difference between
66    #   `await Future()` or`yield from Future()` (correct) vs.
67    #   `yield Future()` (incorrect).
68    _asyncio_future_blocking = False
69
70    __log_traceback = False
71
72    def __init__(self, *, loop=None):
73        """Initialize the future.
74
75        The optional event_loop argument allows explicitly setting the event
76        loop object used by the future. If it's not provided, the future uses
77        the default event loop.
78        """
79        if loop is None:
80            self._loop = events._get_event_loop()
81        else:
82            self._loop = loop
83        self._callbacks = []
84        if self._loop.get_debug():
85            self._source_traceback = format_helpers.extract_stack(
86                sys._getframe(1))
87
88    def __repr__(self):
89        return base_futures._future_repr(self)
90
91    def __del__(self):
92        if not self.__log_traceback:
93            # set_exception() was not called, or result() or exception()
94            # has consumed the exception
95            return
96        exc = self._exception
97        context = {
98            'message':
99                f'{self.__class__.__name__} exception was never retrieved',
100            'exception': exc,
101            'future': self,
102        }
103        if self._source_traceback:
104            context['source_traceback'] = self._source_traceback
105        self._loop.call_exception_handler(context)
106
107    __class_getitem__ = classmethod(GenericAlias)
108
109    @property
110    def _log_traceback(self):
111        return self.__log_traceback
112
113    @_log_traceback.setter
114    def _log_traceback(self, val):
115        if val:
116            raise ValueError('_log_traceback can only be set to False')
117        self.__log_traceback = False
118
119    def get_loop(self):
120        """Return the event loop the Future is bound to."""
121        loop = self._loop
122        if loop is None:
123            raise RuntimeError("Future object is not initialized.")
124        return loop
125
126    def _make_cancelled_error(self):
127        """Create the CancelledError to raise if the Future is cancelled.
128
129        This should only be called once when handling a cancellation since
130        it erases the saved context exception value.
131        """
132        if self._cancelled_exc is not None:
133            exc = self._cancelled_exc
134            self._cancelled_exc = None
135            return exc
136
137        if self._cancel_message is None:
138            exc = exceptions.CancelledError()
139        else:
140            exc = exceptions.CancelledError(self._cancel_message)
141        exc.__context__ = self._cancelled_exc
142        # Remove the reference since we don't need this anymore.
143        self._cancelled_exc = None
144        return exc
145
146    def cancel(self, msg=None):
147        """Cancel the future and schedule callbacks.
148
149        If the future is already done or cancelled, return False.  Otherwise,
150        change the future's state to cancelled, schedule the callbacks and
151        return True.
152        """
153        self.__log_traceback = False
154        if self._state != _PENDING:
155            return False
156        self._state = _CANCELLED
157        self._cancel_message = msg
158        self.__schedule_callbacks()
159        return True
160
161    def __schedule_callbacks(self):
162        """Internal: Ask the event loop to call all callbacks.
163
164        The callbacks are scheduled to be called as soon as possible. Also
165        clears the callback list.
166        """
167        callbacks = self._callbacks[:]
168        if not callbacks:
169            return
170
171        self._callbacks[:] = []
172        for callback, ctx in callbacks:
173            self._loop.call_soon(callback, self, context=ctx)
174
175    def cancelled(self):
176        """Return True if the future was cancelled."""
177        return self._state == _CANCELLED
178
179    # Don't implement running(); see http://bugs.python.org/issue18699
180
181    def done(self):
182        """Return True if the future is done.
183
184        Done means either that a result / exception are available, or that the
185        future was cancelled.
186        """
187        return self._state != _PENDING
188
189    def result(self):
190        """Return the result this future represents.
191
192        If the future has been cancelled, raises CancelledError.  If the
193        future's result isn't yet available, raises InvalidStateError.  If
194        the future is done and has an exception set, this exception is raised.
195        """
196        if self._state == _CANCELLED:
197            exc = self._make_cancelled_error()
198            raise exc
199        if self._state != _FINISHED:
200            raise exceptions.InvalidStateError('Result is not ready.')
201        self.__log_traceback = False
202        if self._exception is not None:
203            raise self._exception.with_traceback(self._exception_tb)
204        return self._result
205
206    def exception(self):
207        """Return the exception that was set on this future.
208
209        The exception (or None if no exception was set) is returned only if
210        the future is done.  If the future has been cancelled, raises
211        CancelledError.  If the future isn't done yet, raises
212        InvalidStateError.
213        """
214        if self._state == _CANCELLED:
215            exc = self._make_cancelled_error()
216            raise exc
217        if self._state != _FINISHED:
218            raise exceptions.InvalidStateError('Exception is not set.')
219        self.__log_traceback = False
220        return self._exception
221
222    def add_done_callback(self, fn, *, context=None):
223        """Add a callback to be run when the future becomes done.
224
225        The callback is called with a single argument - the future object. If
226        the future is already done when this is called, the callback is
227        scheduled with call_soon.
228        """
229        if self._state != _PENDING:
230            self._loop.call_soon(fn, self, context=context)
231        else:
232            if context is None:
233                context = contextvars.copy_context()
234            self._callbacks.append((fn, context))
235
236    # New method not in PEP 3148.
237
238    def remove_done_callback(self, fn):
239        """Remove all instances of a callback from the "call when done" list.
240
241        Returns the number of callbacks removed.
242        """
243        filtered_callbacks = [(f, ctx)
244                              for (f, ctx) in self._callbacks
245                              if f != fn]
246        removed_count = len(self._callbacks) - len(filtered_callbacks)
247        if removed_count:
248            self._callbacks[:] = filtered_callbacks
249        return removed_count
250
251    # So-called internal methods (note: no set_running_or_notify_cancel()).
252
253    def set_result(self, result):
254        """Mark the future done and set its result.
255
256        If the future is already done when this method is called, raises
257        InvalidStateError.
258        """
259        if self._state != _PENDING:
260            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
261        self._result = result
262        self._state = _FINISHED
263        self.__schedule_callbacks()
264
265    def set_exception(self, exception):
266        """Mark the future done and set an exception.
267
268        If the future is already done when this method is called, raises
269        InvalidStateError.
270        """
271        if self._state != _PENDING:
272            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
273        if isinstance(exception, type):
274            exception = exception()
275        if type(exception) is StopIteration:
276            raise TypeError("StopIteration interacts badly with generators "
277                            "and cannot be raised into a Future")
278        self._exception = exception
279        self._exception_tb = exception.__traceback__
280        self._state = _FINISHED
281        self.__schedule_callbacks()
282        self.__log_traceback = True
283
284    def __await__(self):
285        if not self.done():
286            self._asyncio_future_blocking = True
287            yield self  # This tells Task to wait for completion.
288        if not self.done():
289            raise RuntimeError("await wasn't used with future")
290        return self.result()  # May raise too.
291
292    __iter__ = __await__  # make compatible with 'yield from'.
293
294
295# Needed for testing purposes.
296_PyFuture = Future
297
298
299def _get_loop(fut):
300    # Tries to call Future.get_loop() if it's available.
301    # Otherwise fallbacks to using the old '_loop' property.
302    try:
303        get_loop = fut.get_loop
304    except AttributeError:
305        pass
306    else:
307        return get_loop()
308    return fut._loop
309
310
311def _set_result_unless_cancelled(fut, result):
312    """Helper setting the result only if the future was not cancelled."""
313    if fut.cancelled():
314        return
315    fut.set_result(result)
316
317
318def _convert_future_exc(exc):
319    exc_class = type(exc)
320    if exc_class is concurrent.futures.CancelledError:
321        return exceptions.CancelledError(*exc.args)
322    elif exc_class is concurrent.futures.TimeoutError:
323        return exceptions.TimeoutError(*exc.args)
324    elif exc_class is concurrent.futures.InvalidStateError:
325        return exceptions.InvalidStateError(*exc.args)
326    else:
327        return exc
328
329
330def _set_concurrent_future_state(concurrent, source):
331    """Copy state from a future to a concurrent.futures.Future."""
332    assert source.done()
333    if source.cancelled():
334        concurrent.cancel()
335    if not concurrent.set_running_or_notify_cancel():
336        return
337    exception = source.exception()
338    if exception is not None:
339        concurrent.set_exception(_convert_future_exc(exception))
340    else:
341        result = source.result()
342        concurrent.set_result(result)
343
344
345def _copy_future_state(source, dest):
346    """Internal helper to copy state from another Future.
347
348    The other Future may be a concurrent.futures.Future.
349    """
350    assert source.done()
351    if dest.cancelled():
352        return
353    assert not dest.done()
354    if source.cancelled():
355        dest.cancel()
356    else:
357        exception = source.exception()
358        if exception is not None:
359            dest.set_exception(_convert_future_exc(exception))
360        else:
361            result = source.result()
362            dest.set_result(result)
363
364
365def _chain_future(source, destination):
366    """Chain two futures so that when one completes, so does the other.
367
368    The result (or exception) of source will be copied to destination.
369    If destination is cancelled, source gets cancelled too.
370    Compatible with both asyncio.Future and concurrent.futures.Future.
371    """
372    if not isfuture(source) and not isinstance(source,
373                                               concurrent.futures.Future):
374        raise TypeError('A future is required for source argument')
375    if not isfuture(destination) and not isinstance(destination,
376                                                    concurrent.futures.Future):
377        raise TypeError('A future is required for destination argument')
378    source_loop = _get_loop(source) if isfuture(source) else None
379    dest_loop = _get_loop(destination) if isfuture(destination) else None
380
381    def _set_state(future, other):
382        if isfuture(future):
383            _copy_future_state(other, future)
384        else:
385            _set_concurrent_future_state(future, other)
386
387    def _call_check_cancel(destination):
388        if destination.cancelled():
389            if source_loop is None or source_loop is dest_loop:
390                source.cancel()
391            else:
392                source_loop.call_soon_threadsafe(source.cancel)
393
394    def _call_set_state(source):
395        if (destination.cancelled() and
396                dest_loop is not None and dest_loop.is_closed()):
397            return
398        if dest_loop is None or dest_loop is source_loop:
399            _set_state(destination, source)
400        else:
401            if dest_loop.is_closed():
402                return
403            dest_loop.call_soon_threadsafe(_set_state, destination, source)
404
405    destination.add_done_callback(_call_check_cancel)
406    source.add_done_callback(_call_set_state)
407
408
409def wrap_future(future, *, loop=None):
410    """Wrap concurrent.futures.Future object."""
411    if isfuture(future):
412        return future
413    assert isinstance(future, concurrent.futures.Future), \
414        f'concurrent.futures.Future is expected, got {future!r}'
415    if loop is None:
416        loop = events._get_event_loop()
417    new_future = loop.create_future()
418    _chain_future(future, new_future)
419    return new_future
420
421
422try:
423    import _asyncio
424except ImportError:
425    pass
426else:
427    # _CFuture is needed for tests.
428    Future = _CFuture = _asyncio.Future
429