1# Copyright 2017, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Abstract and helper bases for Future implementations."""
16
17import abc
18import concurrent.futures
19
20from google.api_core import exceptions
21from google.api_core import retry
22from google.api_core.future import _helpers
23from google.api_core.future import base
24
25
26class _OperationNotComplete(Exception):
27    """Private exception used for polling via retry."""
28
29    pass
30
31
32RETRY_PREDICATE = retry.if_exception_type(
33    _OperationNotComplete,
34    exceptions.TooManyRequests,
35    exceptions.InternalServerError,
36    exceptions.BadGateway,
37    exceptions.ServiceUnavailable,
38)
39DEFAULT_RETRY = retry.Retry(predicate=RETRY_PREDICATE)
40
41
42class PollingFuture(base.Future):
43    """A Future that needs to poll some service to check its status.
44
45    The :meth:`done` method should be implemented by subclasses. The polling
46    behavior will repeatedly call ``done`` until it returns True.
47
48    .. note::
49
50        Privacy here is intended to prevent the final class from
51        overexposing, not to prevent subclasses from accessing methods.
52
53    Args:
54        retry (google.api_core.retry.Retry): The retry configuration used
55            when polling. This can be used to control how often :meth:`done`
56            is polled. Regardless of the retry's ``deadline``, it will be
57            overridden by the ``timeout`` argument to :meth:`result`.
58    """
59
60    def __init__(self, retry=DEFAULT_RETRY):
61        super(PollingFuture, self).__init__()
62        self._retry = retry
63        self._result = None
64        self._exception = None
65        self._result_set = False
66        """bool: Set to True when the result has been set via set_result or
67        set_exception."""
68        self._polling_thread = None
69        self._done_callbacks = []
70
71    @abc.abstractmethod
72    def done(self, retry=DEFAULT_RETRY):
73        """Checks to see if the operation is complete.
74
75        Args:
76            retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
77
78        Returns:
79            bool: True if the operation is complete, False otherwise.
80        """
81        # pylint: disable=redundant-returns-doc, missing-raises-doc
82        raise NotImplementedError()
83
84    def _done_or_raise(self, retry=DEFAULT_RETRY):
85        """Check if the future is done and raise if it's not."""
86        kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry}
87
88        if not self.done(**kwargs):
89            raise _OperationNotComplete()
90
91    def running(self):
92        """True if the operation is currently running."""
93        return not self.done()
94
95    def _blocking_poll(self, timeout=None, retry=DEFAULT_RETRY):
96        """Poll and wait for the Future to be resolved.
97
98        Args:
99            timeout (int):
100                How long (in seconds) to wait for the operation to complete.
101                If None, wait indefinitely.
102        """
103        if self._result_set:
104            return
105
106        retry_ = self._retry.with_deadline(timeout)
107
108        try:
109            kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry}
110            retry_(self._done_or_raise)(**kwargs)
111        except exceptions.RetryError:
112            raise concurrent.futures.TimeoutError(
113                "Operation did not complete within the designated " "timeout."
114            )
115
116    def result(self, timeout=None, retry=DEFAULT_RETRY):
117        """Get the result of the operation, blocking if necessary.
118
119        Args:
120            timeout (int):
121                How long (in seconds) to wait for the operation to complete.
122                If None, wait indefinitely.
123
124        Returns:
125            google.protobuf.Message: The Operation's result.
126
127        Raises:
128            google.api_core.GoogleAPICallError: If the operation errors or if
129                the timeout is reached before the operation completes.
130        """
131        kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry}
132        self._blocking_poll(timeout=timeout, **kwargs)
133
134        if self._exception is not None:
135            # pylint: disable=raising-bad-type
136            # Pylint doesn't recognize that this is valid in this case.
137            raise self._exception
138
139        return self._result
140
141    def exception(self, timeout=None):
142        """Get the exception from the operation, blocking if necessary.
143
144        Args:
145            timeout (int): How long to wait for the operation to complete.
146                If None, wait indefinitely.
147
148        Returns:
149            Optional[google.api_core.GoogleAPICallError]: The operation's
150                error.
151        """
152        self._blocking_poll(timeout=timeout)
153        return self._exception
154
155    def add_done_callback(self, fn):
156        """Add a callback to be executed when the operation is complete.
157
158        If the operation is not already complete, this will start a helper
159        thread to poll for the status of the operation in the background.
160
161        Args:
162            fn (Callable[Future]): The callback to execute when the operation
163                is complete.
164        """
165        if self._result_set:
166            _helpers.safe_invoke_callback(fn, self)
167            return
168
169        self._done_callbacks.append(fn)
170
171        if self._polling_thread is None:
172            # The polling thread will exit on its own as soon as the operation
173            # is done.
174            self._polling_thread = _helpers.start_daemon_thread(
175                target=self._blocking_poll
176            )
177
178    def _invoke_callbacks(self, *args, **kwargs):
179        """Invoke all done callbacks."""
180        for callback in self._done_callbacks:
181            _helpers.safe_invoke_callback(callback, *args, **kwargs)
182
183    def set_result(self, result):
184        """Set the Future's result."""
185        self._result = result
186        self._result_set = True
187        self._invoke_callbacks(self)
188
189    def set_exception(self, exception):
190        """Set the Future's exception."""
191        self._exception = exception
192        self._result_set = True
193        self._invoke_callbacks(self)
194