1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Helpers for retrying coroutine functions with exponential back-off. 16 17The :class:`AsyncRetry` decorator shares most functionality and behavior with 18:class:`Retry`, but supports coroutine functions. Please refer to description 19of :class:`Retry` for more details. 20 21By default, this decorator will retry transient 22API errors (see :func:`if_transient_error`). For example: 23 24.. code-block:: python 25 26 @retry_async.AsyncRetry() 27 async def call_flaky_rpc(): 28 return await client.flaky_rpc() 29 30 # Will retry flaky_rpc() if it raises transient API errors. 31 result = await call_flaky_rpc() 32 33You can pass a custom predicate to retry on different exceptions, such as 34waiting for an eventually consistent item to be available: 35 36.. code-block:: python 37 38 @retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound)) 39 async def check_if_exists(): 40 return await client.does_thing_exist() 41 42 is_available = await check_if_exists() 43 44Some client library methods apply retry automatically. These methods can accept 45a ``retry`` parameter that allows you to configure the behavior: 46 47.. code-block:: python 48 49 my_retry = retry_async.AsyncRetry(deadline=60) 50 result = await client.some_method(retry=my_retry) 51 52""" 53 54import asyncio 55import datetime 56import functools 57import logging 58 59from google.api_core import datetime_helpers 60from google.api_core import exceptions 61from google.api_core.retry import exponential_sleep_generator 62from google.api_core.retry import if_exception_type # noqa: F401 63from google.api_core.retry import if_transient_error 64 65 66_LOGGER = logging.getLogger(__name__) 67_DEFAULT_INITIAL_DELAY = 1.0 # seconds 68_DEFAULT_MAXIMUM_DELAY = 60.0 # seconds 69_DEFAULT_DELAY_MULTIPLIER = 2.0 70_DEFAULT_DEADLINE = 60.0 * 2.0 # seconds 71 72 73async def retry_target(target, predicate, sleep_generator, deadline, on_error=None): 74 """Call a function and retry if it fails. 75 76 This is the lowest-level retry helper. Generally, you'll use the 77 higher-level retry helper :class:`Retry`. 78 79 Args: 80 target(Callable): The function to call and retry. This must be a 81 nullary function - apply arguments with `functools.partial`. 82 predicate (Callable[Exception]): A callable used to determine if an 83 exception raised by the target should be considered retryable. 84 It should return True to retry or False otherwise. 85 sleep_generator (Iterable[float]): An infinite iterator that determines 86 how long to sleep between retries. 87 deadline (float): How long to keep retrying the target. The last sleep 88 period is shortened as necessary, so that the last retry runs at 89 ``deadline`` (and not considerably beyond it). 90 on_error (Callable[Exception]): A function to call while processing a 91 retryable exception. Any error raised by this function will *not* 92 be caught. 93 94 Returns: 95 Any: the return value of the target function. 96 97 Raises: 98 google.api_core.RetryError: If the deadline is exceeded while retrying. 99 ValueError: If the sleep generator stops yielding values. 100 Exception: If the target raises a method that isn't retryable. 101 """ 102 deadline_dt = ( 103 (datetime_helpers.utcnow() + datetime.timedelta(seconds=deadline)) 104 if deadline 105 else None 106 ) 107 108 last_exc = None 109 110 for sleep in sleep_generator: 111 try: 112 if not deadline_dt: 113 return await target() 114 else: 115 return await asyncio.wait_for( 116 target(), 117 timeout=(deadline_dt - datetime_helpers.utcnow()).total_seconds(), 118 ) 119 # pylint: disable=broad-except 120 # This function explicitly must deal with broad exceptions. 121 except Exception as exc: 122 if not predicate(exc) and not isinstance(exc, asyncio.TimeoutError): 123 raise 124 last_exc = exc 125 if on_error is not None: 126 on_error(exc) 127 128 now = datetime_helpers.utcnow() 129 130 if deadline_dt: 131 if deadline_dt <= now: 132 # Chains the raising RetryError with the root cause error, 133 # which helps observability and debugability. 134 raise exceptions.RetryError( 135 "Deadline of {:.1f}s exceeded while calling {}".format( 136 deadline, target 137 ), 138 last_exc, 139 ) from last_exc 140 else: 141 time_to_deadline = (deadline_dt - now).total_seconds() 142 sleep = min(time_to_deadline, sleep) 143 144 _LOGGER.debug( 145 "Retrying due to {}, sleeping {:.1f}s ...".format(last_exc, sleep) 146 ) 147 await asyncio.sleep(sleep) 148 149 raise ValueError("Sleep generator stopped yielding sleep values.") 150 151 152class AsyncRetry: 153 """Exponential retry decorator for async functions. 154 155 This class is a decorator used to add exponential back-off retry behavior 156 to an RPC call. 157 158 Although the default behavior is to retry transient API errors, a 159 different predicate can be provided to retry other exceptions. 160 161 Args: 162 predicate (Callable[Exception]): A callable that should return ``True`` 163 if the given exception is retryable. 164 initial (float): The minimum a,out of time to delay in seconds. This 165 must be greater than 0. 166 maximum (float): The maximum amout of time to delay in seconds. 167 multiplier (float): The multiplier applied to the delay. 168 deadline (float): How long to keep retrying in seconds. The last sleep 169 period is shortened as necessary, so that the last retry runs at 170 ``deadline`` (and not considerably beyond it). 171 on_error (Callable[Exception]): A function to call while processing 172 a retryable exception. Any error raised by this function will 173 *not* be caught. 174 """ 175 176 def __init__( 177 self, 178 predicate=if_transient_error, 179 initial=_DEFAULT_INITIAL_DELAY, 180 maximum=_DEFAULT_MAXIMUM_DELAY, 181 multiplier=_DEFAULT_DELAY_MULTIPLIER, 182 deadline=_DEFAULT_DEADLINE, 183 on_error=None, 184 ): 185 self._predicate = predicate 186 self._initial = initial 187 self._multiplier = multiplier 188 self._maximum = maximum 189 self._deadline = deadline 190 self._on_error = on_error 191 192 def __call__(self, func, on_error=None): 193 """Wrap a callable with retry behavior. 194 195 Args: 196 func (Callable): The callable to add retry behavior to. 197 on_error (Callable[Exception]): A function to call while processing 198 a retryable exception. Any error raised by this function will 199 *not* be caught. 200 201 Returns: 202 Callable: A callable that will invoke ``func`` with retry 203 behavior. 204 """ 205 if self._on_error is not None: 206 on_error = self._on_error 207 208 @functools.wraps(func) 209 async def retry_wrapped_func(*args, **kwargs): 210 """A wrapper that calls target function with retry.""" 211 target = functools.partial(func, *args, **kwargs) 212 sleep_generator = exponential_sleep_generator( 213 self._initial, self._maximum, multiplier=self._multiplier 214 ) 215 return await retry_target( 216 target, 217 self._predicate, 218 sleep_generator, 219 self._deadline, 220 on_error=on_error, 221 ) 222 223 return retry_wrapped_func 224 225 def _replace( 226 self, 227 predicate=None, 228 initial=None, 229 maximum=None, 230 multiplier=None, 231 deadline=None, 232 on_error=None, 233 ): 234 return AsyncRetry( 235 predicate=predicate or self._predicate, 236 initial=initial or self._initial, 237 maximum=maximum or self._maximum, 238 multiplier=multiplier or self._multiplier, 239 deadline=deadline or self._deadline, 240 on_error=on_error or self._on_error, 241 ) 242 243 def with_deadline(self, deadline): 244 """Return a copy of this retry with the given deadline. 245 246 Args: 247 deadline (float): How long to keep retrying. 248 249 Returns: 250 AsyncRetry: A new retry instance with the given deadline. 251 """ 252 return self._replace(deadline=deadline) 253 254 def with_predicate(self, predicate): 255 """Return a copy of this retry with the given predicate. 256 257 Args: 258 predicate (Callable[Exception]): A callable that should return 259 ``True`` if the given exception is retryable. 260 261 Returns: 262 AsyncRetry: A new retry instance with the given predicate. 263 """ 264 return self._replace(predicate=predicate) 265 266 def with_delay(self, initial=None, maximum=None, multiplier=None): 267 """Return a copy of this retry with the given delay options. 268 269 Args: 270 initial (float): The minimum amout of time to delay. This must 271 be greater than 0. 272 maximum (float): The maximum amout of time to delay. 273 multiplier (float): The multiplier applied to the delay. 274 275 Returns: 276 AsyncRetry: A new retry instance with the given predicate. 277 """ 278 return self._replace(initial=initial, maximum=maximum, multiplier=multiplier) 279 280 def __str__(self): 281 return ( 282 "<AsyncRetry predicate={}, initial={:.1f}, maximum={:.1f}, " 283 "multiplier={:.1f}, deadline={:.1f}, on_error={}>".format( 284 self._predicate, 285 self._initial, 286 self._maximum, 287 self._multiplier, 288 self._deadline, 289 self._on_error, 290 ) 291 ) 292