1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""AWS Credentials and AWS Signature V4 Request Signer. 16 17This module provides credentials to access Google Cloud resources from Amazon 18Web Services (AWS) workloads. These credentials are recommended over the 19use of service account credentials in AWS as they do not involve the management 20of long-live service account private keys. 21 22AWS Credentials are initialized using external_account arguments which are 23typically loaded from the external credentials JSON file. 24Unlike other Credentials that can be initialized with a list of explicit 25arguments, secrets or credentials, external account clients use the 26environment and hints/guidelines provided by the external_account JSON 27file to retrieve credentials and exchange them for Google access tokens. 28 29This module also provides a basic implementation of the 30`AWS Signature Version 4`_ request signing algorithm. 31 32AWS Credentials use serialized signed requests to the 33`AWS STS GetCallerIdentity`_ API that can be exchanged for Google access tokens 34via the GCP STS endpoint. 35 36.. _AWS Signature Version 4: https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html 37.. _AWS STS GetCallerIdentity: https://docs.aws.amazon.com/STS/latest/APIReference/API_GetCallerIdentity.html 38""" 39 40import hashlib 41import hmac 42import io 43import json 44import os 45import posixpath 46import re 47 48try: 49 from urllib.parse import urljoin 50# Python 2.7 compatibility 51except ImportError: # pragma: NO COVER 52 from urlparse import urljoin 53 54from six.moves import http_client 55from six.moves import urllib 56 57from google.auth import _helpers 58from google.auth import environment_vars 59from google.auth import exceptions 60from google.auth import external_account 61 62# AWS Signature Version 4 signing algorithm identifier. 63_AWS_ALGORITHM = "AWS4-HMAC-SHA256" 64# The termination string for the AWS credential scope value as defined in 65# https://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html 66_AWS_REQUEST_TYPE = "aws4_request" 67# The AWS authorization header name for the security session token if available. 68_AWS_SECURITY_TOKEN_HEADER = "x-amz-security-token" 69# The AWS authorization header name for the auto-generated date. 70_AWS_DATE_HEADER = "x-amz-date" 71 72 73class RequestSigner(object): 74 """Implements an AWS request signer based on the AWS Signature Version 4 signing 75 process. 76 https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html 77 """ 78 79 def __init__(self, region_name): 80 """Instantiates an AWS request signer used to compute authenticated signed 81 requests to AWS APIs based on the AWS Signature Version 4 signing process. 82 83 Args: 84 region_name (str): The AWS region to use. 85 """ 86 87 self._region_name = region_name 88 89 def get_request_options( 90 self, 91 aws_security_credentials, 92 url, 93 method, 94 request_payload="", 95 additional_headers={}, 96 ): 97 """Generates the signed request for the provided HTTP request for calling 98 an AWS API. This follows the steps described at: 99 https://docs.aws.amazon.com/general/latest/gr/sigv4_signing.html 100 101 Args: 102 aws_security_credentials (Mapping[str, str]): A dictionary containing 103 the AWS security credentials. 104 url (str): The AWS service URL containing the canonical URI and 105 query string. 106 method (str): The HTTP method used to call this API. 107 request_payload (Optional[str]): The optional request payload if 108 available. 109 additional_headers (Optional[Mapping[str, str]]): The optional 110 additional headers needed for the requested AWS API. 111 112 Returns: 113 Mapping[str, str]: The AWS signed request dictionary object. 114 """ 115 # Get AWS credentials. 116 access_key = aws_security_credentials.get("access_key_id") 117 secret_key = aws_security_credentials.get("secret_access_key") 118 security_token = aws_security_credentials.get("security_token") 119 120 additional_headers = additional_headers or {} 121 122 uri = urllib.parse.urlparse(url) 123 # Normalize the URL path. This is needed for the canonical_uri. 124 # os.path.normpath can't be used since it normalizes "/" paths 125 # to "\\" in Windows OS. 126 normalized_uri = urllib.parse.urlparse( 127 urljoin(url, posixpath.normpath(uri.path)) 128 ) 129 # Validate provided URL. 130 if not uri.hostname or uri.scheme != "https": 131 raise ValueError("Invalid AWS service URL") 132 133 header_map = _generate_authentication_header_map( 134 host=uri.hostname, 135 canonical_uri=normalized_uri.path or "/", 136 canonical_querystring=_get_canonical_querystring(uri.query), 137 method=method, 138 region=self._region_name, 139 access_key=access_key, 140 secret_key=secret_key, 141 security_token=security_token, 142 request_payload=request_payload, 143 additional_headers=additional_headers, 144 ) 145 headers = { 146 "Authorization": header_map.get("authorization_header"), 147 "host": uri.hostname, 148 } 149 # Add x-amz-date if available. 150 if "amz_date" in header_map: 151 headers[_AWS_DATE_HEADER] = header_map.get("amz_date") 152 # Append additional optional headers, eg. X-Amz-Target, Content-Type, etc. 153 for key in additional_headers: 154 headers[key] = additional_headers[key] 155 156 # Add session token if available. 157 if security_token is not None: 158 headers[_AWS_SECURITY_TOKEN_HEADER] = security_token 159 160 signed_request = {"url": url, "method": method, "headers": headers} 161 if request_payload: 162 signed_request["data"] = request_payload 163 return signed_request 164 165 166def _get_canonical_querystring(query): 167 """Generates the canonical query string given a raw query string. 168 Logic is based on 169 https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html 170 171 Args: 172 query (str): The raw query string. 173 174 Returns: 175 str: The canonical query string. 176 """ 177 # Parse raw query string. 178 querystring = urllib.parse.parse_qs(query) 179 querystring_encoded_map = {} 180 for key in querystring: 181 quote_key = urllib.parse.quote(key, safe="-_.~") 182 # URI encode key. 183 querystring_encoded_map[quote_key] = [] 184 for item in querystring[key]: 185 # For each key, URI encode all values for that key. 186 querystring_encoded_map[quote_key].append( 187 urllib.parse.quote(item, safe="-_.~") 188 ) 189 # Sort values for each key. 190 querystring_encoded_map[quote_key].sort() 191 # Sort keys. 192 sorted_keys = list(querystring_encoded_map.keys()) 193 sorted_keys.sort() 194 # Reconstruct the query string. Preserve keys with multiple values. 195 querystring_encoded_pairs = [] 196 for key in sorted_keys: 197 for item in querystring_encoded_map[key]: 198 querystring_encoded_pairs.append("{}={}".format(key, item)) 199 return "&".join(querystring_encoded_pairs) 200 201 202def _sign(key, msg): 203 """Creates the HMAC-SHA256 hash of the provided message using the provided 204 key. 205 206 Args: 207 key (str): The HMAC-SHA256 key to use. 208 msg (str): The message to hash. 209 210 Returns: 211 str: The computed hash bytes. 212 """ 213 return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() 214 215 216def _get_signing_key(key, date_stamp, region_name, service_name): 217 """Calculates the signing key used to calculate the signature for 218 AWS Signature Version 4 based on: 219 https://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html 220 221 Args: 222 key (str): The AWS secret access key. 223 date_stamp (str): The '%Y%m%d' date format. 224 region_name (str): The AWS region. 225 service_name (str): The AWS service name, eg. sts. 226 227 Returns: 228 str: The signing key bytes. 229 """ 230 k_date = _sign(("AWS4" + key).encode("utf-8"), date_stamp) 231 k_region = _sign(k_date, region_name) 232 k_service = _sign(k_region, service_name) 233 k_signing = _sign(k_service, "aws4_request") 234 return k_signing 235 236 237def _generate_authentication_header_map( 238 host, 239 canonical_uri, 240 canonical_querystring, 241 method, 242 region, 243 access_key, 244 secret_key, 245 security_token, 246 request_payload="", 247 additional_headers={}, 248): 249 """Generates the authentication header map needed for generating the AWS 250 Signature Version 4 signed request. 251 252 Args: 253 host (str): The AWS service URL hostname. 254 canonical_uri (str): The AWS service URL path name. 255 canonical_querystring (str): The AWS service URL query string. 256 method (str): The HTTP method used to call this API. 257 region (str): The AWS region. 258 access_key (str): The AWS access key ID. 259 secret_key (str): The AWS secret access key. 260 security_token (Optional[str]): The AWS security session token. This is 261 available for temporary sessions. 262 request_payload (Optional[str]): The optional request payload if 263 available. 264 additional_headers (Optional[Mapping[str, str]]): The optional 265 additional headers needed for the requested AWS API. 266 267 Returns: 268 Mapping[str, str]: The AWS authentication header dictionary object. 269 This contains the x-amz-date and authorization header information. 270 """ 271 # iam.amazonaws.com host => iam service. 272 # sts.us-east-2.amazonaws.com host => sts service. 273 service_name = host.split(".")[0] 274 275 current_time = _helpers.utcnow() 276 amz_date = current_time.strftime("%Y%m%dT%H%M%SZ") 277 date_stamp = current_time.strftime("%Y%m%d") 278 279 # Change all additional headers to be lower case. 280 full_headers = {} 281 for key in additional_headers: 282 full_headers[key.lower()] = additional_headers[key] 283 # Add AWS session token if available. 284 if security_token is not None: 285 full_headers[_AWS_SECURITY_TOKEN_HEADER] = security_token 286 287 # Required headers 288 full_headers["host"] = host 289 # Do not use generated x-amz-date if the date header is provided. 290 # Previously the date was not fixed with x-amz- and could be provided 291 # manually. 292 # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.req 293 if "date" not in full_headers: 294 full_headers[_AWS_DATE_HEADER] = amz_date 295 296 # Header keys need to be sorted alphabetically. 297 canonical_headers = "" 298 header_keys = list(full_headers.keys()) 299 header_keys.sort() 300 for key in header_keys: 301 canonical_headers = "{}{}:{}\n".format( 302 canonical_headers, key, full_headers[key] 303 ) 304 signed_headers = ";".join(header_keys) 305 306 payload_hash = hashlib.sha256((request_payload or "").encode("utf-8")).hexdigest() 307 308 # https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html 309 canonical_request = "{}\n{}\n{}\n{}\n{}\n{}".format( 310 method, 311 canonical_uri, 312 canonical_querystring, 313 canonical_headers, 314 signed_headers, 315 payload_hash, 316 ) 317 318 credential_scope = "{}/{}/{}/{}".format( 319 date_stamp, region, service_name, _AWS_REQUEST_TYPE 320 ) 321 322 # https://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html 323 string_to_sign = "{}\n{}\n{}\n{}".format( 324 _AWS_ALGORITHM, 325 amz_date, 326 credential_scope, 327 hashlib.sha256(canonical_request.encode("utf-8")).hexdigest(), 328 ) 329 330 # https://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html 331 signing_key = _get_signing_key(secret_key, date_stamp, region, service_name) 332 signature = hmac.new( 333 signing_key, string_to_sign.encode("utf-8"), hashlib.sha256 334 ).hexdigest() 335 336 # https://docs.aws.amazon.com/general/latest/gr/sigv4-add-signature-to-request.html 337 authorization_header = "{} Credential={}/{}, SignedHeaders={}, Signature={}".format( 338 _AWS_ALGORITHM, access_key, credential_scope, signed_headers, signature 339 ) 340 341 authentication_header = {"authorization_header": authorization_header} 342 # Do not use generated x-amz-date if the date header is provided. 343 if "date" not in full_headers: 344 authentication_header["amz_date"] = amz_date 345 return authentication_header 346 347 348class Credentials(external_account.Credentials): 349 """AWS external account credentials. 350 This is used to exchange serialized AWS signature v4 signed requests to 351 AWS STS GetCallerIdentity service for Google access tokens. 352 """ 353 354 def __init__( 355 self, 356 audience, 357 subject_token_type, 358 token_url, 359 credential_source=None, 360 service_account_impersonation_url=None, 361 client_id=None, 362 client_secret=None, 363 quota_project_id=None, 364 scopes=None, 365 default_scopes=None, 366 ): 367 """Instantiates an AWS workload external account credentials object. 368 369 Args: 370 audience (str): The STS audience field. 371 subject_token_type (str): The subject token type. 372 token_url (str): The STS endpoint URL. 373 credential_source (Mapping): The credential source dictionary used 374 to provide instructions on how to retrieve external credential 375 to be exchanged for Google access tokens. 376 service_account_impersonation_url (Optional[str]): The optional 377 service account impersonation getAccessToken URL. 378 client_id (Optional[str]): The optional client ID. 379 client_secret (Optional[str]): The optional client secret. 380 quota_project_id (Optional[str]): The optional quota project ID. 381 scopes (Optional[Sequence[str]]): Optional scopes to request during 382 the authorization grant. 383 default_scopes (Optional[Sequence[str]]): Default scopes passed by a 384 Google client library. Use 'scopes' for user-defined scopes. 385 386 Raises: 387 google.auth.exceptions.RefreshError: If an error is encountered during 388 access token retrieval logic. 389 ValueError: For invalid parameters. 390 391 .. note:: Typically one of the helper constructors 392 :meth:`from_file` or 393 :meth:`from_info` are used instead of calling the constructor directly. 394 """ 395 super(Credentials, self).__init__( 396 audience=audience, 397 subject_token_type=subject_token_type, 398 token_url=token_url, 399 credential_source=credential_source, 400 service_account_impersonation_url=service_account_impersonation_url, 401 client_id=client_id, 402 client_secret=client_secret, 403 quota_project_id=quota_project_id, 404 scopes=scopes, 405 default_scopes=default_scopes, 406 ) 407 credential_source = credential_source or {} 408 self._environment_id = credential_source.get("environment_id") or "" 409 self._region_url = credential_source.get("region_url") 410 self._security_credentials_url = credential_source.get("url") 411 self._cred_verification_url = credential_source.get( 412 "regional_cred_verification_url" 413 ) 414 self._region = None 415 self._request_signer = None 416 self._target_resource = audience 417 418 # Get the environment ID. Currently, only one version supported (v1). 419 matches = re.match(r"^(aws)([\d]+)$", self._environment_id) 420 if matches: 421 env_id, env_version = matches.groups() 422 else: 423 env_id, env_version = (None, None) 424 425 if env_id != "aws" or self._cred_verification_url is None: 426 raise ValueError("No valid AWS 'credential_source' provided") 427 elif int(env_version or "") != 1: 428 raise ValueError( 429 "aws version '{}' is not supported in the current build.".format( 430 env_version 431 ) 432 ) 433 434 def retrieve_subject_token(self, request): 435 """Retrieves the subject token using the credential_source object. 436 The subject token is a serialized `AWS GetCallerIdentity signed request`_. 437 438 The logic is summarized as: 439 440 Retrieve the AWS region from the AWS_REGION or AWS_DEFAULT_REGION 441 environment variable or from the AWS metadata server availability-zone 442 if not found in the environment variable. 443 444 Check AWS credentials in environment variables. If not found, retrieve 445 from the AWS metadata server security-credentials endpoint. 446 447 When retrieving AWS credentials from the metadata server 448 security-credentials endpoint, the AWS role needs to be determined by 449 calling the security-credentials endpoint without any argument. Then the 450 credentials can be retrieved via: security-credentials/role_name 451 452 Generate the signed request to AWS STS GetCallerIdentity action. 453 454 Inject x-goog-cloud-target-resource into header and serialize the 455 signed request. This will be the subject-token to pass to GCP STS. 456 457 .. _AWS GetCallerIdentity signed request: 458 https://cloud.google.com/iam/docs/access-resources-aws#exchange-token 459 460 Args: 461 request (google.auth.transport.Request): A callable used to make 462 HTTP requests. 463 Returns: 464 str: The retrieved subject token. 465 """ 466 # Initialize the request signer if not yet initialized after determining 467 # the current AWS region. 468 if self._request_signer is None: 469 self._region = self._get_region(request, self._region_url) 470 self._request_signer = RequestSigner(self._region) 471 472 # Retrieve the AWS security credentials needed to generate the signed 473 # request. 474 aws_security_credentials = self._get_security_credentials(request) 475 # Generate the signed request to AWS STS GetCallerIdentity API. 476 # Use the required regional endpoint. Otherwise, the request will fail. 477 request_options = self._request_signer.get_request_options( 478 aws_security_credentials, 479 self._cred_verification_url.replace("{region}", self._region), 480 "POST", 481 ) 482 # The GCP STS endpoint expects the headers to be formatted as: 483 # [ 484 # {key: 'x-amz-date', value: '...'}, 485 # {key: 'Authorization', value: '...'}, 486 # ... 487 # ] 488 # And then serialized as: 489 # quote(json.dumps({ 490 # url: '...', 491 # method: 'POST', 492 # headers: [{key: 'x-amz-date', value: '...'}, ...] 493 # })) 494 request_headers = request_options.get("headers") 495 # The full, canonical resource name of the workload identity pool 496 # provider, with or without the HTTPS prefix. 497 # Including this header as part of the signature is recommended to 498 # ensure data integrity. 499 request_headers["x-goog-cloud-target-resource"] = self._target_resource 500 501 # Serialize AWS signed request. 502 # Keeping inner keys in sorted order makes testing easier for Python 503 # versions <=3.5 as the stringified JSON string would have a predictable 504 # key order. 505 aws_signed_req = {} 506 aws_signed_req["url"] = request_options.get("url") 507 aws_signed_req["method"] = request_options.get("method") 508 aws_signed_req["headers"] = [] 509 # Reformat header to GCP STS expected format. 510 for key in sorted(request_headers.keys()): 511 aws_signed_req["headers"].append( 512 {"key": key, "value": request_headers[key]} 513 ) 514 515 return urllib.parse.quote( 516 json.dumps(aws_signed_req, separators=(",", ":"), sort_keys=True) 517 ) 518 519 def _get_region(self, request, url): 520 """Retrieves the current AWS region from either the AWS_REGION or 521 AWS_DEFAULT_REGION environment variable or from the AWS metadata server. 522 523 Args: 524 request (google.auth.transport.Request): A callable used to make 525 HTTP requests. 526 url (str): The AWS metadata server region URL. 527 528 Returns: 529 str: The current AWS region. 530 531 Raises: 532 google.auth.exceptions.RefreshError: If an error occurs while 533 retrieving the AWS region. 534 """ 535 # The AWS metadata server is not available in some AWS environments 536 # such as AWS lambda. Instead, it is available via environment 537 # variable. 538 env_aws_region = os.environ.get(environment_vars.AWS_REGION) 539 if env_aws_region is not None: 540 return env_aws_region 541 542 env_aws_region = os.environ.get(environment_vars.AWS_DEFAULT_REGION) 543 if env_aws_region is not None: 544 return env_aws_region 545 546 if not self._region_url: 547 raise exceptions.RefreshError("Unable to determine AWS region") 548 response = request(url=self._region_url, method="GET") 549 550 # Support both string and bytes type response.data. 551 response_body = ( 552 response.data.decode("utf-8") 553 if hasattr(response.data, "decode") 554 else response.data 555 ) 556 557 if response.status != 200: 558 raise exceptions.RefreshError( 559 "Unable to retrieve AWS region", response_body 560 ) 561 562 # This endpoint will return the region in format: us-east-2b. 563 # Only the us-east-2 part should be used. 564 return response_body[:-1] 565 566 def _get_security_credentials(self, request): 567 """Retrieves the AWS security credentials required for signing AWS 568 requests from either the AWS security credentials environment variables 569 or from the AWS metadata server. 570 571 Args: 572 request (google.auth.transport.Request): A callable used to make 573 HTTP requests. 574 575 Returns: 576 Mapping[str, str]: The AWS security credentials dictionary object. 577 578 Raises: 579 google.auth.exceptions.RefreshError: If an error occurs while 580 retrieving the AWS security credentials. 581 """ 582 583 # Check environment variables for permanent credentials first. 584 # https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html 585 env_aws_access_key_id = os.environ.get(environment_vars.AWS_ACCESS_KEY_ID) 586 env_aws_secret_access_key = os.environ.get( 587 environment_vars.AWS_SECRET_ACCESS_KEY 588 ) 589 # This is normally not available for permanent credentials. 590 env_aws_session_token = os.environ.get(environment_vars.AWS_SESSION_TOKEN) 591 if env_aws_access_key_id and env_aws_secret_access_key: 592 return { 593 "access_key_id": env_aws_access_key_id, 594 "secret_access_key": env_aws_secret_access_key, 595 "security_token": env_aws_session_token, 596 } 597 598 # Get role name. 599 role_name = self._get_metadata_role_name(request) 600 601 # Get security credentials. 602 credentials = self._get_metadata_security_credentials(request, role_name) 603 604 return { 605 "access_key_id": credentials.get("AccessKeyId"), 606 "secret_access_key": credentials.get("SecretAccessKey"), 607 "security_token": credentials.get("Token"), 608 } 609 610 def _get_metadata_security_credentials(self, request, role_name): 611 """Retrieves the AWS security credentials required for signing AWS 612 requests from the AWS metadata server. 613 614 Args: 615 request (google.auth.transport.Request): A callable used to make 616 HTTP requests. 617 role_name (str): The AWS role name required by the AWS metadata 618 server security_credentials endpoint in order to return the 619 credentials. 620 621 Returns: 622 Mapping[str, str]: The AWS metadata server security credentials 623 response. 624 625 Raises: 626 google.auth.exceptions.RefreshError: If an error occurs while 627 retrieving the AWS security credentials. 628 """ 629 headers = {"Content-Type": "application/json"} 630 response = request( 631 url="{}/{}".format(self._security_credentials_url, role_name), 632 method="GET", 633 headers=headers, 634 ) 635 636 # support both string and bytes type response.data 637 response_body = ( 638 response.data.decode("utf-8") 639 if hasattr(response.data, "decode") 640 else response.data 641 ) 642 643 if response.status != http_client.OK: 644 raise exceptions.RefreshError( 645 "Unable to retrieve AWS security credentials", response_body 646 ) 647 648 credentials_response = json.loads(response_body) 649 650 return credentials_response 651 652 def _get_metadata_role_name(self, request): 653 """Retrieves the AWS role currently attached to the current AWS 654 workload by querying the AWS metadata server. This is needed for the 655 AWS metadata server security credentials endpoint in order to retrieve 656 the AWS security credentials needed to sign requests to AWS APIs. 657 658 Args: 659 request (google.auth.transport.Request): A callable used to make 660 HTTP requests. 661 662 Returns: 663 str: The AWS role name. 664 665 Raises: 666 google.auth.exceptions.RefreshError: If an error occurs while 667 retrieving the AWS role name. 668 """ 669 if self._security_credentials_url is None: 670 raise exceptions.RefreshError( 671 "Unable to determine the AWS metadata server security credentials endpoint" 672 ) 673 response = request(url=self._security_credentials_url, method="GET") 674 675 # support both string and bytes type response.data 676 response_body = ( 677 response.data.decode("utf-8") 678 if hasattr(response.data, "decode") 679 else response.data 680 ) 681 682 if response.status != http_client.OK: 683 raise exceptions.RefreshError( 684 "Unable to retrieve AWS role name", response_body 685 ) 686 687 return response_body 688 689 @classmethod 690 def from_info(cls, info, **kwargs): 691 """Creates an AWS Credentials instance from parsed external account info. 692 693 Args: 694 info (Mapping[str, str]): The AWS external account info in Google 695 format. 696 kwargs: Additional arguments to pass to the constructor. 697 698 Returns: 699 google.auth.aws.Credentials: The constructed credentials. 700 701 Raises: 702 ValueError: For invalid parameters. 703 """ 704 return cls( 705 audience=info.get("audience"), 706 subject_token_type=info.get("subject_token_type"), 707 token_url=info.get("token_url"), 708 service_account_impersonation_url=info.get( 709 "service_account_impersonation_url" 710 ), 711 client_id=info.get("client_id"), 712 client_secret=info.get("client_secret"), 713 credential_source=info.get("credential_source"), 714 quota_project_id=info.get("quota_project_id"), 715 **kwargs 716 ) 717 718 @classmethod 719 def from_file(cls, filename, **kwargs): 720 """Creates an AWS Credentials instance from an external account json file. 721 722 Args: 723 filename (str): The path to the AWS external account json file. 724 kwargs: Additional arguments to pass to the constructor. 725 726 Returns: 727 google.auth.aws.Credentials: The constructed credentials. 728 """ 729 with io.open(filename, "r", encoding="utf-8") as json_file: 730 data = json.load(json_file) 731 return cls.from_info(data, **kwargs) 732