1# -*- coding: utf-8 -*-
2# Copyright 2020 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17from typing import Callable, Dict, Optional, Sequence, Tuple, Union
18
19from requests import __version__ as requests_version
20
21from google.api_core import exceptions as core_exceptions  # type: ignore
22from google.api_core import gapic_v1  # type: ignore
23from google.api_core import path_template  # type: ignore
24from google.api_core import rest_helpers  # type: ignore
25from google.api_core import retry as retries  # type: ignore
26from google.auth import credentials as ga_credentials  # type: ignore
27from google.auth.transport.requests import AuthorizedSession  # type: ignore
28from google.longrunning import operations_pb2  # type: ignore
29from google.protobuf import empty_pb2  # type: ignore
30from google.protobuf import json_format  # type: ignore
31from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport
32
33OptionalRetry = Union[retries.Retry, object]
34
35DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
36    gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version,
37    grpc_version=None,
38    rest_version=requests_version,
39)
40
41
42class OperationsRestTransport(OperationsTransport):
43    """REST backend transport for Operations.
44
45    Manages long-running operations with an API service.
46
47    When an API method normally takes long time to complete, it can be
48    designed to return [Operation][google.api_core.operations_v1.Operation] to the
49    client, and the client can use this interface to receive the real
50    response asynchronously by polling the operation resource, or pass
51    the operation resource to another API (such as Google Cloud Pub/Sub
52    API) to receive the response. Any API service that returns
53    long-running operations should implement the ``Operations``
54    interface so developers can have a consistent client experience.
55
56    This class defines the same methods as the primary client, so the
57    primary client can load the underlying transport implementation
58    and call it.
59
60    It sends JSON representations of protocol buffers over HTTP/1.1
61    """
62
63    def __init__(
64        self,
65        *,
66        host: str = "longrunning.googleapis.com",
67        credentials: ga_credentials.Credentials = None,
68        credentials_file: Optional[str] = None,
69        scopes: Optional[Sequence[str]] = None,
70        client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
71        quota_project_id: Optional[str] = None,
72        client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
73        always_use_jwt_access: Optional[bool] = False,
74        url_scheme: str = "https",
75        http_options: Optional[Dict] = None,
76    ) -> None:
77        """Instantiate the transport.
78
79        Args:
80            host (Optional[str]):
81                 The hostname to connect to.
82            credentials (Optional[google.auth.credentials.Credentials]): The
83                authorization credentials to attach to requests. These
84                credentials identify the application to the service; if none
85                are specified, the client will attempt to ascertain the
86                credentials from the environment.
87
88            credentials_file (Optional[str]): A file with credentials that can
89                be loaded with :func:`google.auth.load_credentials_from_file`.
90                This argument is ignored if ``channel`` is provided.
91            scopes (Optional(Sequence[str])): A list of scopes. This argument is
92                ignored if ``channel`` is provided.
93            client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client
94                certificate to configure mutual TLS HTTP channel. It is ignored
95                if ``channel`` is provided.
96            quota_project_id (Optional[str]): An optional project to use for billing
97                and quota.
98            client_info (google.api_core.gapic_v1.client_info.ClientInfo):
99                The client info used to send a user-agent string along with
100                API requests. If ``None``, then default info will be used.
101                Generally, you only need to set this if you're developing
102                your own client library.
103            always_use_jwt_access (Optional[bool]): Whether self signed JWT should
104                be used for service account credentials.
105            url_scheme: the protocol scheme for the API endpoint.  Normally
106                "https", but for testing or local servers,
107                "http" can be specified.
108            http_options: a dictionary of http_options for transcoding, to override
109                the defaults from operatons.proto.  Each method has an entry
110                with the corresponding http rules as value.
111
112        """
113        # Run the base constructor
114        # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc.
115        # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the
116        # credentials object
117        super().__init__(
118            host=host,
119            credentials=credentials,
120            client_info=client_info,
121            always_use_jwt_access=always_use_jwt_access,
122        )
123        self._session = AuthorizedSession(
124            self._credentials, default_host=self.DEFAULT_HOST
125        )
126        if client_cert_source_for_mtls:
127            self._session.configure_mtls_channel(client_cert_source_for_mtls)
128        self._prep_wrapped_messages(client_info)
129        self._http_options = http_options or {}
130
131    def _list_operations(
132        self,
133        request: operations_pb2.ListOperationsRequest,
134        *,
135        retry: OptionalRetry = gapic_v1.method.DEFAULT,
136        timeout: Optional[float] = None,
137        metadata: Sequence[Tuple[str, str]] = (),
138    ) -> operations_pb2.ListOperationsResponse:
139        r"""Call the list operations method over HTTP.
140
141        Args:
142            request (~.operations_pb2.ListOperationsRequest):
143                The request object. The request message for
144                [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations].
145
146            retry (google.api_core.retry.Retry): Designation of what errors, if any,
147                should be retried.
148            timeout (float): The timeout for this request.
149            metadata (Sequence[Tuple[str, str]]): Strings which should be
150                sent along with the request as metadata.
151
152        Returns:
153            ~.operations_pb2.ListOperationsResponse:
154                The response message for
155                [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations].
156
157        """
158
159        http_options = [
160            {"method": "get", "uri": "/v1/{name=operations}"},
161        ]
162        if "google.longrunning.Operations.ListOperations" in self._http_options:
163            http_options = self._http_options[
164                "google.longrunning.Operations.ListOperations"
165            ]
166
167        request_kwargs = json_format.MessageToDict(
168            request,
169            preserving_proto_field_name=True,
170            including_default_value_fields=True,
171        )
172        transcoded_request = path_template.transcode(http_options, **request_kwargs)
173
174        uri = transcoded_request["uri"]
175        method = transcoded_request["method"]
176
177        # Jsonify the query params
178        query_params_request = operations_pb2.ListOperationsRequest()
179        json_format.ParseDict(transcoded_request["query_params"], query_params_request)
180        query_params = json_format.MessageToDict(
181            query_params_request,
182            including_default_value_fields=False,
183            preserving_proto_field_name=False,
184            use_integers_for_enums=False,
185        )
186
187        # Send the request
188        headers = dict(metadata)
189        headers["Content-Type"] = "application/json"
190        response = getattr(self._session, method)(
191            "https://{host}{uri}".format(host=self._host, uri=uri),
192            timeout=timeout,
193            headers=headers,
194            params=rest_helpers.flatten_query_params(query_params),
195        )
196
197        # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
198        # subclass.
199        if response.status_code >= 400:
200            raise core_exceptions.from_http_response(response)
201
202        # Return the response
203        api_response = operations_pb2.ListOperationsResponse()
204        json_format.Parse(response.content, api_response, ignore_unknown_fields=False)
205        return api_response
206
207    def _get_operation(
208        self,
209        request: operations_pb2.GetOperationRequest,
210        *,
211        retry: OptionalRetry = gapic_v1.method.DEFAULT,
212        timeout: Optional[float] = None,
213        metadata: Sequence[Tuple[str, str]] = (),
214    ) -> operations_pb2.Operation:
215        r"""Call the get operation method over HTTP.
216
217        Args:
218            request (~.operations_pb2.GetOperationRequest):
219                The request object. The request message for
220                [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation].
221
222            retry (google.api_core.retry.Retry): Designation of what errors, if any,
223                should be retried.
224            timeout (float): The timeout for this request.
225            metadata (Sequence[Tuple[str, str]]): Strings which should be
226                sent along with the request as metadata.
227
228        Returns:
229            ~.operations_pb2.Operation:
230                This resource represents a long-
231                unning operation that is the result of a
232                network API call.
233
234        """
235
236        http_options = [
237            {"method": "get", "uri": "/v1/{name=operations/**}"},
238        ]
239        if "google.longrunning.Operations.GetOperation" in self._http_options:
240            http_options = self._http_options[
241                "google.longrunning.Operations.GetOperation"
242            ]
243
244        request_kwargs = json_format.MessageToDict(
245            request,
246            preserving_proto_field_name=True,
247            including_default_value_fields=True,
248        )
249        transcoded_request = path_template.transcode(http_options, **request_kwargs)
250
251        uri = transcoded_request["uri"]
252        method = transcoded_request["method"]
253
254        # Jsonify the query params
255        query_params_request = operations_pb2.GetOperationRequest()
256        json_format.ParseDict(transcoded_request["query_params"], query_params_request)
257        query_params = json_format.MessageToDict(
258            query_params_request,
259            including_default_value_fields=False,
260            preserving_proto_field_name=False,
261            use_integers_for_enums=False,
262        )
263
264        # Send the request
265        headers = dict(metadata)
266        headers["Content-Type"] = "application/json"
267        response = getattr(self._session, method)(
268            "https://{host}{uri}".format(host=self._host, uri=uri),
269            timeout=timeout,
270            headers=headers,
271            params=rest_helpers.flatten_query_params(query_params),
272        )
273
274        # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
275        # subclass.
276        if response.status_code >= 400:
277            raise core_exceptions.from_http_response(response)
278
279        # Return the response
280        api_response = operations_pb2.Operation()
281        json_format.Parse(response.content, api_response, ignore_unknown_fields=False)
282        return api_response
283
284    def _delete_operation(
285        self,
286        request: operations_pb2.DeleteOperationRequest,
287        *,
288        retry: OptionalRetry = gapic_v1.method.DEFAULT,
289        timeout: Optional[float] = None,
290        metadata: Sequence[Tuple[str, str]] = (),
291    ) -> empty_pb2.Empty:
292        r"""Call the delete operation method over HTTP.
293
294        Args:
295            request (~.operations_pb2.DeleteOperationRequest):
296                The request object. The request message for
297                [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation].
298
299            retry (google.api_core.retry.Retry): Designation of what errors, if any,
300                should be retried.
301            timeout (float): The timeout for this request.
302            metadata (Sequence[Tuple[str, str]]): Strings which should be
303                sent along with the request as metadata.
304        """
305
306        http_options = [
307            {"method": "delete", "uri": "/v1/{name=operations/**}"},
308        ]
309        if "google.longrunning.Operations.DeleteOperation" in self._http_options:
310            http_options = self._http_options[
311                "google.longrunning.Operations.DeleteOperation"
312            ]
313
314        request_kwargs = json_format.MessageToDict(
315            request,
316            preserving_proto_field_name=True,
317            including_default_value_fields=True,
318        )
319        transcoded_request = path_template.transcode(http_options, **request_kwargs)
320
321        uri = transcoded_request["uri"]
322        method = transcoded_request["method"]
323
324        # Jsonify the query params
325        query_params_request = operations_pb2.DeleteOperationRequest()
326        json_format.ParseDict(transcoded_request["query_params"], query_params_request)
327        query_params = json_format.MessageToDict(
328            query_params_request,
329            including_default_value_fields=False,
330            preserving_proto_field_name=False,
331            use_integers_for_enums=False,
332        )
333
334        # Send the request
335        headers = dict(metadata)
336        headers["Content-Type"] = "application/json"
337        response = getattr(self._session, method)(
338            "https://{host}{uri}".format(host=self._host, uri=uri),
339            timeout=timeout,
340            headers=headers,
341            params=rest_helpers.flatten_query_params(query_params),
342        )
343
344        # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
345        # subclass.
346        if response.status_code >= 400:
347            raise core_exceptions.from_http_response(response)
348
349        return empty_pb2.Empty()
350
351    def _cancel_operation(
352        self,
353        request: operations_pb2.CancelOperationRequest,
354        *,
355        retry: OptionalRetry = gapic_v1.method.DEFAULT,
356        timeout: Optional[float] = None,
357        metadata: Sequence[Tuple[str, str]] = (),
358    ) -> empty_pb2.Empty:
359        r"""Call the cancel operation method over HTTP.
360
361        Args:
362            request (~.operations_pb2.CancelOperationRequest):
363                The request object. The request message for
364                [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation].
365
366            retry (google.api_core.retry.Retry): Designation of what errors, if any,
367                should be retried.
368            timeout (float): The timeout for this request.
369            metadata (Sequence[Tuple[str, str]]): Strings which should be
370                sent along with the request as metadata.
371        """
372
373        http_options = [
374            {"method": "post", "uri": "/v1/{name=operations/**}:cancel", "body": "*"},
375        ]
376        if "google.longrunning.Operations.CancelOperation" in self._http_options:
377            http_options = self._http_options[
378                "google.longrunning.Operations.CancelOperation"
379            ]
380
381        request_kwargs = json_format.MessageToDict(
382            request,
383            preserving_proto_field_name=True,
384            including_default_value_fields=True,
385        )
386        transcoded_request = path_template.transcode(http_options, **request_kwargs)
387
388        # Jsonify the request body
389        body_request = operations_pb2.CancelOperationRequest()
390        json_format.ParseDict(transcoded_request["body"], body_request)
391        body = json_format.MessageToDict(
392            body_request,
393            including_default_value_fields=False,
394            preserving_proto_field_name=False,
395            use_integers_for_enums=False,
396        )
397        uri = transcoded_request["uri"]
398        method = transcoded_request["method"]
399
400        # Jsonify the query params
401        query_params_request = operations_pb2.CancelOperationRequest()
402        json_format.ParseDict(transcoded_request["query_params"], query_params_request)
403        query_params = json_format.MessageToDict(
404            query_params_request,
405            including_default_value_fields=False,
406            preserving_proto_field_name=False,
407            use_integers_for_enums=False,
408        )
409
410        # Send the request
411        headers = dict(metadata)
412        headers["Content-Type"] = "application/json"
413        response = getattr(self._session, method)(
414            "https://{host}{uri}".format(host=self._host, uri=uri),
415            timeout=timeout,
416            headers=headers,
417            params=rest_helpers.flatten_query_params(query_params),
418            data=body,
419        )
420
421        # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception
422        # subclass.
423        if response.status_code >= 400:
424            raise core_exceptions.from_http_response(response)
425
426        return empty_pb2.Empty()
427
428    @property
429    def list_operations(
430        self,
431    ) -> Callable[
432        [operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse
433    ]:
434        return self._list_operations
435
436    @property
437    def get_operation(
438        self,
439    ) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]:
440        return self._get_operation
441
442    @property
443    def delete_operation(
444        self,
445    ) -> Callable[[operations_pb2.DeleteOperationRequest], empty_pb2.Empty]:
446        return self._delete_operation
447
448    @property
449    def cancel_operation(
450        self,
451    ) -> Callable[[operations_pb2.CancelOperationRequest], empty_pb2.Empty]:
452        return self._cancel_operation
453
454
455__all__ = ("OperationsRestTransport",)
456