1"""A Future class similar to the one in PEP 3148.""" 2 3__all__ = ( 4 'Future', 'wrap_future', 'isfuture', 5) 6 7import concurrent.futures 8import contextvars 9import logging 10import sys 11from types import GenericAlias 12 13from . import base_futures 14from . import events 15from . import exceptions 16from . import format_helpers 17 18 19isfuture = base_futures.isfuture 20 21 22_PENDING = base_futures._PENDING 23_CANCELLED = base_futures._CANCELLED 24_FINISHED = base_futures._FINISHED 25 26 27STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging 28 29 30class Future: 31 """This class is *almost* compatible with concurrent.futures.Future. 32 33 Differences: 34 35 - This class is not thread-safe. 36 37 - result() and exception() do not take a timeout argument and 38 raise an exception when the future isn't done yet. 39 40 - Callbacks registered with add_done_callback() are always called 41 via the event loop's call_soon(). 42 43 - This class is not compatible with the wait() and as_completed() 44 methods in the concurrent.futures package. 45 46 (In Python 3.4 or later we may be able to unify the implementations.) 47 """ 48 49 # Class variables serving as defaults for instance variables. 50 _state = _PENDING 51 _result = None 52 _exception = None 53 _loop = None 54 _source_traceback = None 55 _cancel_message = None 56 # A saved CancelledError for later chaining as an exception context. 57 _cancelled_exc = None 58 59 # This field is used for a dual purpose: 60 # - Its presence is a marker to declare that a class implements 61 # the Future protocol (i.e. is intended to be duck-type compatible). 62 # The value must also be not-None, to enable a subclass to declare 63 # that it is not compatible by setting this to None. 64 # - It is set by __iter__() below so that Task._step() can tell 65 # the difference between 66 # `await Future()` or`yield from Future()` (correct) vs. 67 # `yield Future()` (incorrect). 68 _asyncio_future_blocking = False 69 70 __log_traceback = False 71 72 def __init__(self, *, loop=None): 73 """Initialize the future. 74 75 The optional event_loop argument allows explicitly setting the event 76 loop object used by the future. If it's not provided, the future uses 77 the default event loop. 78 """ 79 if loop is None: 80 self._loop = events._get_event_loop() 81 else: 82 self._loop = loop 83 self._callbacks = [] 84 if self._loop.get_debug(): 85 self._source_traceback = format_helpers.extract_stack( 86 sys._getframe(1)) 87 88 def __repr__(self): 89 return base_futures._future_repr(self) 90 91 def __del__(self): 92 if not self.__log_traceback: 93 # set_exception() was not called, or result() or exception() 94 # has consumed the exception 95 return 96 exc = self._exception 97 context = { 98 'message': 99 f'{self.__class__.__name__} exception was never retrieved', 100 'exception': exc, 101 'future': self, 102 } 103 if self._source_traceback: 104 context['source_traceback'] = self._source_traceback 105 self._loop.call_exception_handler(context) 106 107 __class_getitem__ = classmethod(GenericAlias) 108 109 @property 110 def _log_traceback(self): 111 return self.__log_traceback 112 113 @_log_traceback.setter 114 def _log_traceback(self, val): 115 if val: 116 raise ValueError('_log_traceback can only be set to False') 117 self.__log_traceback = False 118 119 def get_loop(self): 120 """Return the event loop the Future is bound to.""" 121 loop = self._loop 122 if loop is None: 123 raise RuntimeError("Future object is not initialized.") 124 return loop 125 126 def _make_cancelled_error(self): 127 """Create the CancelledError to raise if the Future is cancelled. 128 129 This should only be called once when handling a cancellation since 130 it erases the saved context exception value. 131 """ 132 if self._cancelled_exc is not None: 133 exc = self._cancelled_exc 134 self._cancelled_exc = None 135 return exc 136 137 if self._cancel_message is None: 138 exc = exceptions.CancelledError() 139 else: 140 exc = exceptions.CancelledError(self._cancel_message) 141 exc.__context__ = self._cancelled_exc 142 # Remove the reference since we don't need this anymore. 143 self._cancelled_exc = None 144 return exc 145 146 def cancel(self, msg=None): 147 """Cancel the future and schedule callbacks. 148 149 If the future is already done or cancelled, return False. Otherwise, 150 change the future's state to cancelled, schedule the callbacks and 151 return True. 152 """ 153 self.__log_traceback = False 154 if self._state != _PENDING: 155 return False 156 self._state = _CANCELLED 157 self._cancel_message = msg 158 self.__schedule_callbacks() 159 return True 160 161 def __schedule_callbacks(self): 162 """Internal: Ask the event loop to call all callbacks. 163 164 The callbacks are scheduled to be called as soon as possible. Also 165 clears the callback list. 166 """ 167 callbacks = self._callbacks[:] 168 if not callbacks: 169 return 170 171 self._callbacks[:] = [] 172 for callback, ctx in callbacks: 173 self._loop.call_soon(callback, self, context=ctx) 174 175 def cancelled(self): 176 """Return True if the future was cancelled.""" 177 return self._state == _CANCELLED 178 179 # Don't implement running(); see http://bugs.python.org/issue18699 180 181 def done(self): 182 """Return True if the future is done. 183 184 Done means either that a result / exception are available, or that the 185 future was cancelled. 186 """ 187 return self._state != _PENDING 188 189 def result(self): 190 """Return the result this future represents. 191 192 If the future has been cancelled, raises CancelledError. If the 193 future's result isn't yet available, raises InvalidStateError. If 194 the future is done and has an exception set, this exception is raised. 195 """ 196 if self._state == _CANCELLED: 197 exc = self._make_cancelled_error() 198 raise exc 199 if self._state != _FINISHED: 200 raise exceptions.InvalidStateError('Result is not ready.') 201 self.__log_traceback = False 202 if self._exception is not None: 203 raise self._exception.with_traceback(self._exception_tb) 204 return self._result 205 206 def exception(self): 207 """Return the exception that was set on this future. 208 209 The exception (or None if no exception was set) is returned only if 210 the future is done. If the future has been cancelled, raises 211 CancelledError. If the future isn't done yet, raises 212 InvalidStateError. 213 """ 214 if self._state == _CANCELLED: 215 exc = self._make_cancelled_error() 216 raise exc 217 if self._state != _FINISHED: 218 raise exceptions.InvalidStateError('Exception is not set.') 219 self.__log_traceback = False 220 return self._exception 221 222 def add_done_callback(self, fn, *, context=None): 223 """Add a callback to be run when the future becomes done. 224 225 The callback is called with a single argument - the future object. If 226 the future is already done when this is called, the callback is 227 scheduled with call_soon. 228 """ 229 if self._state != _PENDING: 230 self._loop.call_soon(fn, self, context=context) 231 else: 232 if context is None: 233 context = contextvars.copy_context() 234 self._callbacks.append((fn, context)) 235 236 # New method not in PEP 3148. 237 238 def remove_done_callback(self, fn): 239 """Remove all instances of a callback from the "call when done" list. 240 241 Returns the number of callbacks removed. 242 """ 243 filtered_callbacks = [(f, ctx) 244 for (f, ctx) in self._callbacks 245 if f != fn] 246 removed_count = len(self._callbacks) - len(filtered_callbacks) 247 if removed_count: 248 self._callbacks[:] = filtered_callbacks 249 return removed_count 250 251 # So-called internal methods (note: no set_running_or_notify_cancel()). 252 253 def set_result(self, result): 254 """Mark the future done and set its result. 255 256 If the future is already done when this method is called, raises 257 InvalidStateError. 258 """ 259 if self._state != _PENDING: 260 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 261 self._result = result 262 self._state = _FINISHED 263 self.__schedule_callbacks() 264 265 def set_exception(self, exception): 266 """Mark the future done and set an exception. 267 268 If the future is already done when this method is called, raises 269 InvalidStateError. 270 """ 271 if self._state != _PENDING: 272 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 273 if isinstance(exception, type): 274 exception = exception() 275 if type(exception) is StopIteration: 276 raise TypeError("StopIteration interacts badly with generators " 277 "and cannot be raised into a Future") 278 self._exception = exception 279 self._exception_tb = exception.__traceback__ 280 self._state = _FINISHED 281 self.__schedule_callbacks() 282 self.__log_traceback = True 283 284 def __await__(self): 285 if not self.done(): 286 self._asyncio_future_blocking = True 287 yield self # This tells Task to wait for completion. 288 if not self.done(): 289 raise RuntimeError("await wasn't used with future") 290 return self.result() # May raise too. 291 292 __iter__ = __await__ # make compatible with 'yield from'. 293 294 295# Needed for testing purposes. 296_PyFuture = Future 297 298 299def _get_loop(fut): 300 # Tries to call Future.get_loop() if it's available. 301 # Otherwise fallbacks to using the old '_loop' property. 302 try: 303 get_loop = fut.get_loop 304 except AttributeError: 305 pass 306 else: 307 return get_loop() 308 return fut._loop 309 310 311def _set_result_unless_cancelled(fut, result): 312 """Helper setting the result only if the future was not cancelled.""" 313 if fut.cancelled(): 314 return 315 fut.set_result(result) 316 317 318def _convert_future_exc(exc): 319 exc_class = type(exc) 320 if exc_class is concurrent.futures.CancelledError: 321 return exceptions.CancelledError(*exc.args) 322 elif exc_class is concurrent.futures.TimeoutError: 323 return exceptions.TimeoutError(*exc.args) 324 elif exc_class is concurrent.futures.InvalidStateError: 325 return exceptions.InvalidStateError(*exc.args) 326 else: 327 return exc 328 329 330def _set_concurrent_future_state(concurrent, source): 331 """Copy state from a future to a concurrent.futures.Future.""" 332 assert source.done() 333 if source.cancelled(): 334 concurrent.cancel() 335 if not concurrent.set_running_or_notify_cancel(): 336 return 337 exception = source.exception() 338 if exception is not None: 339 concurrent.set_exception(_convert_future_exc(exception)) 340 else: 341 result = source.result() 342 concurrent.set_result(result) 343 344 345def _copy_future_state(source, dest): 346 """Internal helper to copy state from another Future. 347 348 The other Future may be a concurrent.futures.Future. 349 """ 350 assert source.done() 351 if dest.cancelled(): 352 return 353 assert not dest.done() 354 if source.cancelled(): 355 dest.cancel() 356 else: 357 exception = source.exception() 358 if exception is not None: 359 dest.set_exception(_convert_future_exc(exception)) 360 else: 361 result = source.result() 362 dest.set_result(result) 363 364 365def _chain_future(source, destination): 366 """Chain two futures so that when one completes, so does the other. 367 368 The result (or exception) of source will be copied to destination. 369 If destination is cancelled, source gets cancelled too. 370 Compatible with both asyncio.Future and concurrent.futures.Future. 371 """ 372 if not isfuture(source) and not isinstance(source, 373 concurrent.futures.Future): 374 raise TypeError('A future is required for source argument') 375 if not isfuture(destination) and not isinstance(destination, 376 concurrent.futures.Future): 377 raise TypeError('A future is required for destination argument') 378 source_loop = _get_loop(source) if isfuture(source) else None 379 dest_loop = _get_loop(destination) if isfuture(destination) else None 380 381 def _set_state(future, other): 382 if isfuture(future): 383 _copy_future_state(other, future) 384 else: 385 _set_concurrent_future_state(future, other) 386 387 def _call_check_cancel(destination): 388 if destination.cancelled(): 389 if source_loop is None or source_loop is dest_loop: 390 source.cancel() 391 else: 392 source_loop.call_soon_threadsafe(source.cancel) 393 394 def _call_set_state(source): 395 if (destination.cancelled() and 396 dest_loop is not None and dest_loop.is_closed()): 397 return 398 if dest_loop is None or dest_loop is source_loop: 399 _set_state(destination, source) 400 else: 401 if dest_loop.is_closed(): 402 return 403 dest_loop.call_soon_threadsafe(_set_state, destination, source) 404 405 destination.add_done_callback(_call_check_cancel) 406 source.add_done_callback(_call_set_state) 407 408 409def wrap_future(future, *, loop=None): 410 """Wrap concurrent.futures.Future object.""" 411 if isfuture(future): 412 return future 413 assert isinstance(future, concurrent.futures.Future), \ 414 f'concurrent.futures.Future is expected, got {future!r}' 415 if loop is None: 416 loop = events._get_event_loop() 417 new_future = loop.create_future() 418 _chain_future(future, new_future) 419 return new_future 420 421 422try: 423 import _asyncio 424except ImportError: 425 pass 426else: 427 # _CFuture is needed for tests. 428 Future = _CFuture = _asyncio.Future 429