1# -*- coding: utf-8 -*- 2# Copyright 2020 Google LLC 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17from typing import Callable, Dict, Optional, Sequence, Tuple, Union 18 19from requests import __version__ as requests_version 20 21from google.api_core import exceptions as core_exceptions # type: ignore 22from google.api_core import gapic_v1 # type: ignore 23from google.api_core import path_template # type: ignore 24from google.api_core import rest_helpers # type: ignore 25from google.api_core import retry as retries # type: ignore 26from google.auth import credentials as ga_credentials # type: ignore 27from google.auth.transport.requests import AuthorizedSession # type: ignore 28from google.longrunning import operations_pb2 # type: ignore 29from google.protobuf import empty_pb2 # type: ignore 30from google.protobuf import json_format # type: ignore 31from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport 32 33OptionalRetry = Union[retries.Retry, object] 34 35DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 36 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 37 grpc_version=None, 38 rest_version=requests_version, 39) 40 41 42class OperationsRestTransport(OperationsTransport): 43 """REST backend transport for Operations. 44 45 Manages long-running operations with an API service. 46 47 When an API method normally takes long time to complete, it can be 48 designed to return [Operation][google.api_core.operations_v1.Operation] to the 49 client, and the client can use this interface to receive the real 50 response asynchronously by polling the operation resource, or pass 51 the operation resource to another API (such as Google Cloud Pub/Sub 52 API) to receive the response. Any API service that returns 53 long-running operations should implement the ``Operations`` 54 interface so developers can have a consistent client experience. 55 56 This class defines the same methods as the primary client, so the 57 primary client can load the underlying transport implementation 58 and call it. 59 60 It sends JSON representations of protocol buffers over HTTP/1.1 61 """ 62 63 def __init__( 64 self, 65 *, 66 host: str = "longrunning.googleapis.com", 67 credentials: ga_credentials.Credentials = None, 68 credentials_file: Optional[str] = None, 69 scopes: Optional[Sequence[str]] = None, 70 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 71 quota_project_id: Optional[str] = None, 72 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 73 always_use_jwt_access: Optional[bool] = False, 74 url_scheme: str = "https", 75 http_options: Optional[Dict] = None, 76 ) -> None: 77 """Instantiate the transport. 78 79 Args: 80 host (Optional[str]): 81 The hostname to connect to. 82 credentials (Optional[google.auth.credentials.Credentials]): The 83 authorization credentials to attach to requests. These 84 credentials identify the application to the service; if none 85 are specified, the client will attempt to ascertain the 86 credentials from the environment. 87 88 credentials_file (Optional[str]): A file with credentials that can 89 be loaded with :func:`google.auth.load_credentials_from_file`. 90 This argument is ignored if ``channel`` is provided. 91 scopes (Optional(Sequence[str])): A list of scopes. This argument is 92 ignored if ``channel`` is provided. 93 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 94 certificate to configure mutual TLS HTTP channel. It is ignored 95 if ``channel`` is provided. 96 quota_project_id (Optional[str]): An optional project to use for billing 97 and quota. 98 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 99 The client info used to send a user-agent string along with 100 API requests. If ``None``, then default info will be used. 101 Generally, you only need to set this if you're developing 102 your own client library. 103 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 104 be used for service account credentials. 105 url_scheme: the protocol scheme for the API endpoint. Normally 106 "https", but for testing or local servers, 107 "http" can be specified. 108 http_options: a dictionary of http_options for transcoding, to override 109 the defaults from operatons.proto. Each method has an entry 110 with the corresponding http rules as value. 111 112 """ 113 # Run the base constructor 114 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 115 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 116 # credentials object 117 super().__init__( 118 host=host, 119 credentials=credentials, 120 client_info=client_info, 121 always_use_jwt_access=always_use_jwt_access, 122 ) 123 self._session = AuthorizedSession( 124 self._credentials, default_host=self.DEFAULT_HOST 125 ) 126 if client_cert_source_for_mtls: 127 self._session.configure_mtls_channel(client_cert_source_for_mtls) 128 self._prep_wrapped_messages(client_info) 129 self._http_options = http_options or {} 130 131 def _list_operations( 132 self, 133 request: operations_pb2.ListOperationsRequest, 134 *, 135 retry: OptionalRetry = gapic_v1.method.DEFAULT, 136 timeout: Optional[float] = None, 137 metadata: Sequence[Tuple[str, str]] = (), 138 ) -> operations_pb2.ListOperationsResponse: 139 r"""Call the list operations method over HTTP. 140 141 Args: 142 request (~.operations_pb2.ListOperationsRequest): 143 The request object. The request message for 144 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. 145 146 retry (google.api_core.retry.Retry): Designation of what errors, if any, 147 should be retried. 148 timeout (float): The timeout for this request. 149 metadata (Sequence[Tuple[str, str]]): Strings which should be 150 sent along with the request as metadata. 151 152 Returns: 153 ~.operations_pb2.ListOperationsResponse: 154 The response message for 155 [Operations.ListOperations][google.api_core.operations_v1.Operations.ListOperations]. 156 157 """ 158 159 http_options = [ 160 {"method": "get", "uri": "/v1/{name=operations}"}, 161 ] 162 if "google.longrunning.Operations.ListOperations" in self._http_options: 163 http_options = self._http_options[ 164 "google.longrunning.Operations.ListOperations" 165 ] 166 167 request_kwargs = json_format.MessageToDict( 168 request, 169 preserving_proto_field_name=True, 170 including_default_value_fields=True, 171 ) 172 transcoded_request = path_template.transcode(http_options, **request_kwargs) 173 174 uri = transcoded_request["uri"] 175 method = transcoded_request["method"] 176 177 # Jsonify the query params 178 query_params_request = operations_pb2.ListOperationsRequest() 179 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 180 query_params = json_format.MessageToDict( 181 query_params_request, 182 including_default_value_fields=False, 183 preserving_proto_field_name=False, 184 use_integers_for_enums=False, 185 ) 186 187 # Send the request 188 headers = dict(metadata) 189 headers["Content-Type"] = "application/json" 190 response = getattr(self._session, method)( 191 "https://{host}{uri}".format(host=self._host, uri=uri), 192 timeout=timeout, 193 headers=headers, 194 params=rest_helpers.flatten_query_params(query_params), 195 ) 196 197 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 198 # subclass. 199 if response.status_code >= 400: 200 raise core_exceptions.from_http_response(response) 201 202 # Return the response 203 api_response = operations_pb2.ListOperationsResponse() 204 json_format.Parse(response.content, api_response, ignore_unknown_fields=False) 205 return api_response 206 207 def _get_operation( 208 self, 209 request: operations_pb2.GetOperationRequest, 210 *, 211 retry: OptionalRetry = gapic_v1.method.DEFAULT, 212 timeout: Optional[float] = None, 213 metadata: Sequence[Tuple[str, str]] = (), 214 ) -> operations_pb2.Operation: 215 r"""Call the get operation method over HTTP. 216 217 Args: 218 request (~.operations_pb2.GetOperationRequest): 219 The request object. The request message for 220 [Operations.GetOperation][google.api_core.operations_v1.Operations.GetOperation]. 221 222 retry (google.api_core.retry.Retry): Designation of what errors, if any, 223 should be retried. 224 timeout (float): The timeout for this request. 225 metadata (Sequence[Tuple[str, str]]): Strings which should be 226 sent along with the request as metadata. 227 228 Returns: 229 ~.operations_pb2.Operation: 230 This resource represents a long- 231 unning operation that is the result of a 232 network API call. 233 234 """ 235 236 http_options = [ 237 {"method": "get", "uri": "/v1/{name=operations/**}"}, 238 ] 239 if "google.longrunning.Operations.GetOperation" in self._http_options: 240 http_options = self._http_options[ 241 "google.longrunning.Operations.GetOperation" 242 ] 243 244 request_kwargs = json_format.MessageToDict( 245 request, 246 preserving_proto_field_name=True, 247 including_default_value_fields=True, 248 ) 249 transcoded_request = path_template.transcode(http_options, **request_kwargs) 250 251 uri = transcoded_request["uri"] 252 method = transcoded_request["method"] 253 254 # Jsonify the query params 255 query_params_request = operations_pb2.GetOperationRequest() 256 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 257 query_params = json_format.MessageToDict( 258 query_params_request, 259 including_default_value_fields=False, 260 preserving_proto_field_name=False, 261 use_integers_for_enums=False, 262 ) 263 264 # Send the request 265 headers = dict(metadata) 266 headers["Content-Type"] = "application/json" 267 response = getattr(self._session, method)( 268 "https://{host}{uri}".format(host=self._host, uri=uri), 269 timeout=timeout, 270 headers=headers, 271 params=rest_helpers.flatten_query_params(query_params), 272 ) 273 274 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 275 # subclass. 276 if response.status_code >= 400: 277 raise core_exceptions.from_http_response(response) 278 279 # Return the response 280 api_response = operations_pb2.Operation() 281 json_format.Parse(response.content, api_response, ignore_unknown_fields=False) 282 return api_response 283 284 def _delete_operation( 285 self, 286 request: operations_pb2.DeleteOperationRequest, 287 *, 288 retry: OptionalRetry = gapic_v1.method.DEFAULT, 289 timeout: Optional[float] = None, 290 metadata: Sequence[Tuple[str, str]] = (), 291 ) -> empty_pb2.Empty: 292 r"""Call the delete operation method over HTTP. 293 294 Args: 295 request (~.operations_pb2.DeleteOperationRequest): 296 The request object. The request message for 297 [Operations.DeleteOperation][google.api_core.operations_v1.Operations.DeleteOperation]. 298 299 retry (google.api_core.retry.Retry): Designation of what errors, if any, 300 should be retried. 301 timeout (float): The timeout for this request. 302 metadata (Sequence[Tuple[str, str]]): Strings which should be 303 sent along with the request as metadata. 304 """ 305 306 http_options = [ 307 {"method": "delete", "uri": "/v1/{name=operations/**}"}, 308 ] 309 if "google.longrunning.Operations.DeleteOperation" in self._http_options: 310 http_options = self._http_options[ 311 "google.longrunning.Operations.DeleteOperation" 312 ] 313 314 request_kwargs = json_format.MessageToDict( 315 request, 316 preserving_proto_field_name=True, 317 including_default_value_fields=True, 318 ) 319 transcoded_request = path_template.transcode(http_options, **request_kwargs) 320 321 uri = transcoded_request["uri"] 322 method = transcoded_request["method"] 323 324 # Jsonify the query params 325 query_params_request = operations_pb2.DeleteOperationRequest() 326 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 327 query_params = json_format.MessageToDict( 328 query_params_request, 329 including_default_value_fields=False, 330 preserving_proto_field_name=False, 331 use_integers_for_enums=False, 332 ) 333 334 # Send the request 335 headers = dict(metadata) 336 headers["Content-Type"] = "application/json" 337 response = getattr(self._session, method)( 338 "https://{host}{uri}".format(host=self._host, uri=uri), 339 timeout=timeout, 340 headers=headers, 341 params=rest_helpers.flatten_query_params(query_params), 342 ) 343 344 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 345 # subclass. 346 if response.status_code >= 400: 347 raise core_exceptions.from_http_response(response) 348 349 return empty_pb2.Empty() 350 351 def _cancel_operation( 352 self, 353 request: operations_pb2.CancelOperationRequest, 354 *, 355 retry: OptionalRetry = gapic_v1.method.DEFAULT, 356 timeout: Optional[float] = None, 357 metadata: Sequence[Tuple[str, str]] = (), 358 ) -> empty_pb2.Empty: 359 r"""Call the cancel operation method over HTTP. 360 361 Args: 362 request (~.operations_pb2.CancelOperationRequest): 363 The request object. The request message for 364 [Operations.CancelOperation][google.api_core.operations_v1.Operations.CancelOperation]. 365 366 retry (google.api_core.retry.Retry): Designation of what errors, if any, 367 should be retried. 368 timeout (float): The timeout for this request. 369 metadata (Sequence[Tuple[str, str]]): Strings which should be 370 sent along with the request as metadata. 371 """ 372 373 http_options = [ 374 {"method": "post", "uri": "/v1/{name=operations/**}:cancel", "body": "*"}, 375 ] 376 if "google.longrunning.Operations.CancelOperation" in self._http_options: 377 http_options = self._http_options[ 378 "google.longrunning.Operations.CancelOperation" 379 ] 380 381 request_kwargs = json_format.MessageToDict( 382 request, 383 preserving_proto_field_name=True, 384 including_default_value_fields=True, 385 ) 386 transcoded_request = path_template.transcode(http_options, **request_kwargs) 387 388 # Jsonify the request body 389 body_request = operations_pb2.CancelOperationRequest() 390 json_format.ParseDict(transcoded_request["body"], body_request) 391 body = json_format.MessageToDict( 392 body_request, 393 including_default_value_fields=False, 394 preserving_proto_field_name=False, 395 use_integers_for_enums=False, 396 ) 397 uri = transcoded_request["uri"] 398 method = transcoded_request["method"] 399 400 # Jsonify the query params 401 query_params_request = operations_pb2.CancelOperationRequest() 402 json_format.ParseDict(transcoded_request["query_params"], query_params_request) 403 query_params = json_format.MessageToDict( 404 query_params_request, 405 including_default_value_fields=False, 406 preserving_proto_field_name=False, 407 use_integers_for_enums=False, 408 ) 409 410 # Send the request 411 headers = dict(metadata) 412 headers["Content-Type"] = "application/json" 413 response = getattr(self._session, method)( 414 "https://{host}{uri}".format(host=self._host, uri=uri), 415 timeout=timeout, 416 headers=headers, 417 params=rest_helpers.flatten_query_params(query_params), 418 data=body, 419 ) 420 421 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 422 # subclass. 423 if response.status_code >= 400: 424 raise core_exceptions.from_http_response(response) 425 426 return empty_pb2.Empty() 427 428 @property 429 def list_operations( 430 self, 431 ) -> Callable[ 432 [operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse 433 ]: 434 return self._list_operations 435 436 @property 437 def get_operation( 438 self, 439 ) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]: 440 return self._get_operation 441 442 @property 443 def delete_operation( 444 self, 445 ) -> Callable[[operations_pb2.DeleteOperationRequest], empty_pb2.Empty]: 446 return self._delete_operation 447 448 @property 449 def cancel_operation( 450 self, 451 ) -> Callable[[operations_pb2.CancelOperationRequest], empty_pb2.Empty]: 452 return self._cancel_operation 453 454 455__all__ = ("OperationsRestTransport",) 456