1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for retrying coroutine functions with exponential back-off.
16
17The :class:`AsyncRetry` decorator shares most functionality and behavior with
18:class:`Retry`, but supports coroutine functions. Please refer to description
19of :class:`Retry` for more details.
20
21By default, this decorator will retry transient
22API errors (see :func:`if_transient_error`). For example:
23
24.. code-block:: python
25
26    @retry_async.AsyncRetry()
27    async def call_flaky_rpc():
28        return await client.flaky_rpc()
29
30    # Will retry flaky_rpc() if it raises transient API errors.
31    result = await call_flaky_rpc()
32
33You can pass a custom predicate to retry on different exceptions, such as
34waiting for an eventually consistent item to be available:
35
36.. code-block:: python
37
38    @retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound))
39    async def check_if_exists():
40        return await client.does_thing_exist()
41
42    is_available = await check_if_exists()
43
44Some client library methods apply retry automatically. These methods can accept
45a ``retry`` parameter that allows you to configure the behavior:
46
47.. code-block:: python
48
49    my_retry = retry_async.AsyncRetry(deadline=60)
50    result = await client.some_method(retry=my_retry)
51
52"""
53
54import asyncio
55import datetime
56import functools
57import logging
58
59from google.api_core import datetime_helpers
60from google.api_core import exceptions
61from google.api_core.retry import exponential_sleep_generator
62from google.api_core.retry import if_exception_type  # noqa: F401
63from google.api_core.retry import if_transient_error
64
65
66_LOGGER = logging.getLogger(__name__)
67_DEFAULT_INITIAL_DELAY = 1.0  # seconds
68_DEFAULT_MAXIMUM_DELAY = 60.0  # seconds
69_DEFAULT_DELAY_MULTIPLIER = 2.0
70_DEFAULT_DEADLINE = 60.0 * 2.0  # seconds
71
72
73async def retry_target(target, predicate, sleep_generator, deadline, on_error=None):
74    """Call a function and retry if it fails.
75
76    This is the lowest-level retry helper. Generally, you'll use the
77    higher-level retry helper :class:`Retry`.
78
79    Args:
80        target(Callable): The function to call and retry. This must be a
81            nullary function - apply arguments with `functools.partial`.
82        predicate (Callable[Exception]): A callable used to determine if an
83            exception raised by the target should be considered retryable.
84            It should return True to retry or False otherwise.
85        sleep_generator (Iterable[float]): An infinite iterator that determines
86            how long to sleep between retries.
87        deadline (float): How long to keep retrying the target. The last sleep
88            period is shortened as necessary, so that the last retry runs at
89            ``deadline`` (and not considerably beyond it).
90        on_error (Callable[Exception]): A function to call while processing a
91            retryable exception.  Any error raised by this function will *not*
92            be caught.
93
94    Returns:
95        Any: the return value of the target function.
96
97    Raises:
98        google.api_core.RetryError: If the deadline is exceeded while retrying.
99        ValueError: If the sleep generator stops yielding values.
100        Exception: If the target raises a method that isn't retryable.
101    """
102    deadline_dt = (
103        (datetime_helpers.utcnow() + datetime.timedelta(seconds=deadline))
104        if deadline
105        else None
106    )
107
108    last_exc = None
109
110    for sleep in sleep_generator:
111        try:
112            if not deadline_dt:
113                return await target()
114            else:
115                return await asyncio.wait_for(
116                    target(),
117                    timeout=(deadline_dt - datetime_helpers.utcnow()).total_seconds(),
118                )
119        # pylint: disable=broad-except
120        # This function explicitly must deal with broad exceptions.
121        except Exception as exc:
122            if not predicate(exc) and not isinstance(exc, asyncio.TimeoutError):
123                raise
124            last_exc = exc
125            if on_error is not None:
126                on_error(exc)
127
128        now = datetime_helpers.utcnow()
129
130        if deadline_dt:
131            if deadline_dt <= now:
132                # Chains the raising RetryError with the root cause error,
133                # which helps observability and debugability.
134                raise exceptions.RetryError(
135                    "Deadline of {:.1f}s exceeded while calling {}".format(
136                        deadline, target
137                    ),
138                    last_exc,
139                ) from last_exc
140            else:
141                time_to_deadline = (deadline_dt - now).total_seconds()
142                sleep = min(time_to_deadline, sleep)
143
144        _LOGGER.debug(
145            "Retrying due to {}, sleeping {:.1f}s ...".format(last_exc, sleep)
146        )
147        await asyncio.sleep(sleep)
148
149    raise ValueError("Sleep generator stopped yielding sleep values.")
150
151
152class AsyncRetry:
153    """Exponential retry decorator for async functions.
154
155    This class is a decorator used to add exponential back-off retry behavior
156    to an RPC call.
157
158    Although the default behavior is to retry transient API errors, a
159    different predicate can be provided to retry other exceptions.
160
161    Args:
162        predicate (Callable[Exception]): A callable that should return ``True``
163            if the given exception is retryable.
164        initial (float): The minimum a,out of time to delay in seconds. This
165            must be greater than 0.
166        maximum (float): The maximum amout of time to delay in seconds.
167        multiplier (float): The multiplier applied to the delay.
168        deadline (float): How long to keep retrying in seconds. The last sleep
169            period is shortened as necessary, so that the last retry runs at
170            ``deadline`` (and not considerably beyond it).
171        on_error (Callable[Exception]): A function to call while processing
172            a retryable exception. Any error raised by this function will
173            *not* be caught.
174    """
175
176    def __init__(
177        self,
178        predicate=if_transient_error,
179        initial=_DEFAULT_INITIAL_DELAY,
180        maximum=_DEFAULT_MAXIMUM_DELAY,
181        multiplier=_DEFAULT_DELAY_MULTIPLIER,
182        deadline=_DEFAULT_DEADLINE,
183        on_error=None,
184    ):
185        self._predicate = predicate
186        self._initial = initial
187        self._multiplier = multiplier
188        self._maximum = maximum
189        self._deadline = deadline
190        self._on_error = on_error
191
192    def __call__(self, func, on_error=None):
193        """Wrap a callable with retry behavior.
194
195        Args:
196            func (Callable): The callable to add retry behavior to.
197            on_error (Callable[Exception]): A function to call while processing
198                a retryable exception. Any error raised by this function will
199                *not* be caught.
200
201        Returns:
202            Callable: A callable that will invoke ``func`` with retry
203                behavior.
204        """
205        if self._on_error is not None:
206            on_error = self._on_error
207
208        @functools.wraps(func)
209        async def retry_wrapped_func(*args, **kwargs):
210            """A wrapper that calls target function with retry."""
211            target = functools.partial(func, *args, **kwargs)
212            sleep_generator = exponential_sleep_generator(
213                self._initial, self._maximum, multiplier=self._multiplier
214            )
215            return await retry_target(
216                target,
217                self._predicate,
218                sleep_generator,
219                self._deadline,
220                on_error=on_error,
221            )
222
223        return retry_wrapped_func
224
225    def _replace(
226        self,
227        predicate=None,
228        initial=None,
229        maximum=None,
230        multiplier=None,
231        deadline=None,
232        on_error=None,
233    ):
234        return AsyncRetry(
235            predicate=predicate or self._predicate,
236            initial=initial or self._initial,
237            maximum=maximum or self._maximum,
238            multiplier=multiplier or self._multiplier,
239            deadline=deadline or self._deadline,
240            on_error=on_error or self._on_error,
241        )
242
243    def with_deadline(self, deadline):
244        """Return a copy of this retry with the given deadline.
245
246        Args:
247            deadline (float): How long to keep retrying.
248
249        Returns:
250            AsyncRetry: A new retry instance with the given deadline.
251        """
252        return self._replace(deadline=deadline)
253
254    def with_predicate(self, predicate):
255        """Return a copy of this retry with the given predicate.
256
257        Args:
258            predicate (Callable[Exception]): A callable that should return
259                ``True`` if the given exception is retryable.
260
261        Returns:
262            AsyncRetry: A new retry instance with the given predicate.
263        """
264        return self._replace(predicate=predicate)
265
266    def with_delay(self, initial=None, maximum=None, multiplier=None):
267        """Return a copy of this retry with the given delay options.
268
269        Args:
270            initial (float): The minimum amout of time to delay. This must
271                be greater than 0.
272            maximum (float): The maximum amout of time to delay.
273            multiplier (float): The multiplier applied to the delay.
274
275        Returns:
276            AsyncRetry: A new retry instance with the given predicate.
277        """
278        return self._replace(initial=initial, maximum=maximum, multiplier=multiplier)
279
280    def __str__(self):
281        return (
282            "<AsyncRetry predicate={}, initial={:.1f}, maximum={:.1f}, "
283            "multiplier={:.1f}, deadline={:.1f}, on_error={}>".format(
284                self._predicate,
285                self._initial,
286                self._maximum,
287                self._multiplier,
288                self._deadline,
289                self._on_error,
290            )
291        )
292