1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""AWS Credentials and AWS Signature V4 Request Signer.
16
17This module provides credentials to access Google Cloud resources from Amazon
18Web Services (AWS) workloads. These credentials are recommended over the
19use of service account credentials in AWS as they do not involve the management
20of long-live service account private keys.
21
22AWS Credentials are initialized using external_account arguments which are
23typically loaded from the external credentials JSON file.
24Unlike other Credentials that can be initialized with a list of explicit
25arguments, secrets or credentials, external account clients use the
26environment and hints/guidelines provided by the external_account JSON
27file to retrieve credentials and exchange them for Google access tokens.
28
29This module also provides a basic implementation of the
30`AWS Signature Version 4`_ request signing algorithm.
31
32AWS Credentials use serialized signed requests to the
33`AWS STS GetCallerIdentity`_ API that can be exchanged for Google access tokens
34via the GCP STS endpoint.
35
36.. _AWS Signature Version 4: https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html
37.. _AWS STS GetCallerIdentity: https://docs.aws.amazon.com/STS/latest/APIReference/API_GetCallerIdentity.html
38"""
39
40import hashlib
41import hmac
42import io
43import json
44import os
45import posixpath
46import re
47
48try:
49    from urllib.parse import urljoin
50# Python 2.7 compatibility
51except ImportError:  # pragma: NO COVER
52    from urlparse import urljoin
53
54from six.moves import http_client
55from six.moves import urllib
56
57from google.auth import _helpers
58from google.auth import environment_vars
59from google.auth import exceptions
60from google.auth import external_account
61
62# AWS Signature Version 4 signing algorithm identifier.
63_AWS_ALGORITHM = "AWS4-HMAC-SHA256"
64# The termination string for the AWS credential scope value as defined in
65# https://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html
66_AWS_REQUEST_TYPE = "aws4_request"
67# The AWS authorization header name for the security session token if available.
68_AWS_SECURITY_TOKEN_HEADER = "x-amz-security-token"
69# The AWS authorization header name for the auto-generated date.
70_AWS_DATE_HEADER = "x-amz-date"
71
72
73class RequestSigner(object):
74    """Implements an AWS request signer based on the AWS Signature Version 4 signing
75    process.
76    https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html
77    """
78
79    def __init__(self, region_name):
80        """Instantiates an AWS request signer used to compute authenticated signed
81        requests to AWS APIs based on the AWS Signature Version 4 signing process.
82
83        Args:
84            region_name (str): The AWS region to use.
85        """
86
87        self._region_name = region_name
88
89    def get_request_options(
90        self,
91        aws_security_credentials,
92        url,
93        method,
94        request_payload="",
95        additional_headers={},
96    ):
97        """Generates the signed request for the provided HTTP request for calling
98        an AWS API. This follows the steps described at:
99        https://docs.aws.amazon.com/general/latest/gr/sigv4_signing.html
100
101        Args:
102            aws_security_credentials (Mapping[str, str]): A dictionary containing
103                the AWS security credentials.
104            url (str): The AWS service URL containing the canonical URI and
105                query string.
106            method (str): The HTTP method used to call this API.
107            request_payload (Optional[str]): The optional request payload if
108                available.
109            additional_headers (Optional[Mapping[str, str]]): The optional
110                additional headers needed for the requested AWS API.
111
112        Returns:
113            Mapping[str, str]: The AWS signed request dictionary object.
114        """
115        # Get AWS credentials.
116        access_key = aws_security_credentials.get("access_key_id")
117        secret_key = aws_security_credentials.get("secret_access_key")
118        security_token = aws_security_credentials.get("security_token")
119
120        additional_headers = additional_headers or {}
121
122        uri = urllib.parse.urlparse(url)
123        # Normalize the URL path. This is needed for the canonical_uri.
124        # os.path.normpath can't be used since it normalizes "/" paths
125        # to "\\" in Windows OS.
126        normalized_uri = urllib.parse.urlparse(
127            urljoin(url, posixpath.normpath(uri.path))
128        )
129        # Validate provided URL.
130        if not uri.hostname or uri.scheme != "https":
131            raise ValueError("Invalid AWS service URL")
132
133        header_map = _generate_authentication_header_map(
134            host=uri.hostname,
135            canonical_uri=normalized_uri.path or "/",
136            canonical_querystring=_get_canonical_querystring(uri.query),
137            method=method,
138            region=self._region_name,
139            access_key=access_key,
140            secret_key=secret_key,
141            security_token=security_token,
142            request_payload=request_payload,
143            additional_headers=additional_headers,
144        )
145        headers = {
146            "Authorization": header_map.get("authorization_header"),
147            "host": uri.hostname,
148        }
149        # Add x-amz-date if available.
150        if "amz_date" in header_map:
151            headers[_AWS_DATE_HEADER] = header_map.get("amz_date")
152        # Append additional optional headers, eg. X-Amz-Target, Content-Type, etc.
153        for key in additional_headers:
154            headers[key] = additional_headers[key]
155
156        # Add session token if available.
157        if security_token is not None:
158            headers[_AWS_SECURITY_TOKEN_HEADER] = security_token
159
160        signed_request = {"url": url, "method": method, "headers": headers}
161        if request_payload:
162            signed_request["data"] = request_payload
163        return signed_request
164
165
166def _get_canonical_querystring(query):
167    """Generates the canonical query string given a raw query string.
168    Logic is based on
169    https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
170
171    Args:
172        query (str): The raw query string.
173
174    Returns:
175        str: The canonical query string.
176    """
177    # Parse raw query string.
178    querystring = urllib.parse.parse_qs(query)
179    querystring_encoded_map = {}
180    for key in querystring:
181        quote_key = urllib.parse.quote(key, safe="-_.~")
182        # URI encode key.
183        querystring_encoded_map[quote_key] = []
184        for item in querystring[key]:
185            # For each key, URI encode all values for that key.
186            querystring_encoded_map[quote_key].append(
187                urllib.parse.quote(item, safe="-_.~")
188            )
189        # Sort values for each key.
190        querystring_encoded_map[quote_key].sort()
191    # Sort keys.
192    sorted_keys = list(querystring_encoded_map.keys())
193    sorted_keys.sort()
194    # Reconstruct the query string. Preserve keys with multiple values.
195    querystring_encoded_pairs = []
196    for key in sorted_keys:
197        for item in querystring_encoded_map[key]:
198            querystring_encoded_pairs.append("{}={}".format(key, item))
199    return "&".join(querystring_encoded_pairs)
200
201
202def _sign(key, msg):
203    """Creates the HMAC-SHA256 hash of the provided message using the provided
204    key.
205
206    Args:
207        key (str): The HMAC-SHA256 key to use.
208        msg (str): The message to hash.
209
210    Returns:
211        str: The computed hash bytes.
212    """
213    return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
214
215
216def _get_signing_key(key, date_stamp, region_name, service_name):
217    """Calculates the signing key used to calculate the signature for
218    AWS Signature Version 4 based on:
219    https://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html
220
221    Args:
222        key (str): The AWS secret access key.
223        date_stamp (str): The '%Y%m%d' date format.
224        region_name (str): The AWS region.
225        service_name (str): The AWS service name, eg. sts.
226
227    Returns:
228        str: The signing key bytes.
229    """
230    k_date = _sign(("AWS4" + key).encode("utf-8"), date_stamp)
231    k_region = _sign(k_date, region_name)
232    k_service = _sign(k_region, service_name)
233    k_signing = _sign(k_service, "aws4_request")
234    return k_signing
235
236
237def _generate_authentication_header_map(
238    host,
239    canonical_uri,
240    canonical_querystring,
241    method,
242    region,
243    access_key,
244    secret_key,
245    security_token,
246    request_payload="",
247    additional_headers={},
248):
249    """Generates the authentication header map needed for generating the AWS
250    Signature Version 4 signed request.
251
252    Args:
253        host (str): The AWS service URL hostname.
254        canonical_uri (str): The AWS service URL path name.
255        canonical_querystring (str): The AWS service URL query string.
256        method (str): The HTTP method used to call this API.
257        region (str): The AWS region.
258        access_key (str): The AWS access key ID.
259        secret_key (str): The AWS secret access key.
260        security_token (Optional[str]): The AWS security session token. This is
261            available for temporary sessions.
262        request_payload (Optional[str]): The optional request payload if
263            available.
264        additional_headers (Optional[Mapping[str, str]]): The optional
265            additional headers needed for the requested AWS API.
266
267    Returns:
268        Mapping[str, str]: The AWS authentication header dictionary object.
269            This contains the x-amz-date and authorization header information.
270    """
271    # iam.amazonaws.com host => iam service.
272    # sts.us-east-2.amazonaws.com host => sts service.
273    service_name = host.split(".")[0]
274
275    current_time = _helpers.utcnow()
276    amz_date = current_time.strftime("%Y%m%dT%H%M%SZ")
277    date_stamp = current_time.strftime("%Y%m%d")
278
279    # Change all additional headers to be lower case.
280    full_headers = {}
281    for key in additional_headers:
282        full_headers[key.lower()] = additional_headers[key]
283    # Add AWS session token if available.
284    if security_token is not None:
285        full_headers[_AWS_SECURITY_TOKEN_HEADER] = security_token
286
287    # Required headers
288    full_headers["host"] = host
289    # Do not use generated x-amz-date if the date header is provided.
290    # Previously the date was not fixed with x-amz- and could be provided
291    # manually.
292    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.req
293    if "date" not in full_headers:
294        full_headers[_AWS_DATE_HEADER] = amz_date
295
296    # Header keys need to be sorted alphabetically.
297    canonical_headers = ""
298    header_keys = list(full_headers.keys())
299    header_keys.sort()
300    for key in header_keys:
301        canonical_headers = "{}{}:{}\n".format(
302            canonical_headers, key, full_headers[key]
303        )
304    signed_headers = ";".join(header_keys)
305
306    payload_hash = hashlib.sha256((request_payload or "").encode("utf-8")).hexdigest()
307
308    # https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
309    canonical_request = "{}\n{}\n{}\n{}\n{}\n{}".format(
310        method,
311        canonical_uri,
312        canonical_querystring,
313        canonical_headers,
314        signed_headers,
315        payload_hash,
316    )
317
318    credential_scope = "{}/{}/{}/{}".format(
319        date_stamp, region, service_name, _AWS_REQUEST_TYPE
320    )
321
322    # https://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html
323    string_to_sign = "{}\n{}\n{}\n{}".format(
324        _AWS_ALGORITHM,
325        amz_date,
326        credential_scope,
327        hashlib.sha256(canonical_request.encode("utf-8")).hexdigest(),
328    )
329
330    # https://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html
331    signing_key = _get_signing_key(secret_key, date_stamp, region, service_name)
332    signature = hmac.new(
333        signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
334    ).hexdigest()
335
336    # https://docs.aws.amazon.com/general/latest/gr/sigv4-add-signature-to-request.html
337    authorization_header = "{} Credential={}/{}, SignedHeaders={}, Signature={}".format(
338        _AWS_ALGORITHM, access_key, credential_scope, signed_headers, signature
339    )
340
341    authentication_header = {"authorization_header": authorization_header}
342    # Do not use generated x-amz-date if the date header is provided.
343    if "date" not in full_headers:
344        authentication_header["amz_date"] = amz_date
345    return authentication_header
346
347
348class Credentials(external_account.Credentials):
349    """AWS external account credentials.
350    This is used to exchange serialized AWS signature v4 signed requests to
351    AWS STS GetCallerIdentity service for Google access tokens.
352    """
353
354    def __init__(
355        self,
356        audience,
357        subject_token_type,
358        token_url,
359        credential_source=None,
360        service_account_impersonation_url=None,
361        client_id=None,
362        client_secret=None,
363        quota_project_id=None,
364        scopes=None,
365        default_scopes=None,
366    ):
367        """Instantiates an AWS workload external account credentials object.
368
369        Args:
370            audience (str): The STS audience field.
371            subject_token_type (str): The subject token type.
372            token_url (str): The STS endpoint URL.
373            credential_source (Mapping): The credential source dictionary used
374                to provide instructions on how to retrieve external credential
375                to be exchanged for Google access tokens.
376            service_account_impersonation_url (Optional[str]): The optional
377                service account impersonation getAccessToken URL.
378            client_id (Optional[str]): The optional client ID.
379            client_secret (Optional[str]): The optional client secret.
380            quota_project_id (Optional[str]): The optional quota project ID.
381            scopes (Optional[Sequence[str]]): Optional scopes to request during
382                the authorization grant.
383            default_scopes (Optional[Sequence[str]]): Default scopes passed by a
384                Google client library. Use 'scopes' for user-defined scopes.
385
386        Raises:
387            google.auth.exceptions.RefreshError: If an error is encountered during
388                access token retrieval logic.
389            ValueError: For invalid parameters.
390
391        .. note:: Typically one of the helper constructors
392            :meth:`from_file` or
393            :meth:`from_info` are used instead of calling the constructor directly.
394        """
395        super(Credentials, self).__init__(
396            audience=audience,
397            subject_token_type=subject_token_type,
398            token_url=token_url,
399            credential_source=credential_source,
400            service_account_impersonation_url=service_account_impersonation_url,
401            client_id=client_id,
402            client_secret=client_secret,
403            quota_project_id=quota_project_id,
404            scopes=scopes,
405            default_scopes=default_scopes,
406        )
407        credential_source = credential_source or {}
408        self._environment_id = credential_source.get("environment_id") or ""
409        self._region_url = credential_source.get("region_url")
410        self._security_credentials_url = credential_source.get("url")
411        self._cred_verification_url = credential_source.get(
412            "regional_cred_verification_url"
413        )
414        self._region = None
415        self._request_signer = None
416        self._target_resource = audience
417
418        # Get the environment ID. Currently, only one version supported (v1).
419        matches = re.match(r"^(aws)([\d]+)$", self._environment_id)
420        if matches:
421            env_id, env_version = matches.groups()
422        else:
423            env_id, env_version = (None, None)
424
425        if env_id != "aws" or self._cred_verification_url is None:
426            raise ValueError("No valid AWS 'credential_source' provided")
427        elif int(env_version or "") != 1:
428            raise ValueError(
429                "aws version '{}' is not supported in the current build.".format(
430                    env_version
431                )
432            )
433
434    def retrieve_subject_token(self, request):
435        """Retrieves the subject token using the credential_source object.
436        The subject token is a serialized `AWS GetCallerIdentity signed request`_.
437
438        The logic is summarized as:
439
440        Retrieve the AWS region from the AWS_REGION or AWS_DEFAULT_REGION
441        environment variable or from the AWS metadata server availability-zone
442        if not found in the environment variable.
443
444        Check AWS credentials in environment variables. If not found, retrieve
445        from the AWS metadata server security-credentials endpoint.
446
447        When retrieving AWS credentials from the metadata server
448        security-credentials endpoint, the AWS role needs to be determined by
449        calling the security-credentials endpoint without any argument. Then the
450        credentials can be retrieved via: security-credentials/role_name
451
452        Generate the signed request to AWS STS GetCallerIdentity action.
453
454        Inject x-goog-cloud-target-resource into header and serialize the
455        signed request. This will be the subject-token to pass to GCP STS.
456
457        .. _AWS GetCallerIdentity signed request:
458            https://cloud.google.com/iam/docs/access-resources-aws#exchange-token
459
460        Args:
461            request (google.auth.transport.Request): A callable used to make
462                HTTP requests.
463        Returns:
464            str: The retrieved subject token.
465        """
466        # Initialize the request signer if not yet initialized after determining
467        # the current AWS region.
468        if self._request_signer is None:
469            self._region = self._get_region(request, self._region_url)
470            self._request_signer = RequestSigner(self._region)
471
472        # Retrieve the AWS security credentials needed to generate the signed
473        # request.
474        aws_security_credentials = self._get_security_credentials(request)
475        # Generate the signed request to AWS STS GetCallerIdentity API.
476        # Use the required regional endpoint. Otherwise, the request will fail.
477        request_options = self._request_signer.get_request_options(
478            aws_security_credentials,
479            self._cred_verification_url.replace("{region}", self._region),
480            "POST",
481        )
482        # The GCP STS endpoint expects the headers to be formatted as:
483        # [
484        #   {key: 'x-amz-date', value: '...'},
485        #   {key: 'Authorization', value: '...'},
486        #   ...
487        # ]
488        # And then serialized as:
489        # quote(json.dumps({
490        #   url: '...',
491        #   method: 'POST',
492        #   headers: [{key: 'x-amz-date', value: '...'}, ...]
493        # }))
494        request_headers = request_options.get("headers")
495        # The full, canonical resource name of the workload identity pool
496        # provider, with or without the HTTPS prefix.
497        # Including this header as part of the signature is recommended to
498        # ensure data integrity.
499        request_headers["x-goog-cloud-target-resource"] = self._target_resource
500
501        # Serialize AWS signed request.
502        # Keeping inner keys in sorted order makes testing easier for Python
503        # versions <=3.5 as the stringified JSON string would have a predictable
504        # key order.
505        aws_signed_req = {}
506        aws_signed_req["url"] = request_options.get("url")
507        aws_signed_req["method"] = request_options.get("method")
508        aws_signed_req["headers"] = []
509        # Reformat header to GCP STS expected format.
510        for key in sorted(request_headers.keys()):
511            aws_signed_req["headers"].append(
512                {"key": key, "value": request_headers[key]}
513            )
514
515        return urllib.parse.quote(
516            json.dumps(aws_signed_req, separators=(",", ":"), sort_keys=True)
517        )
518
519    def _get_region(self, request, url):
520        """Retrieves the current AWS region from either the AWS_REGION or
521        AWS_DEFAULT_REGION environment variable or from the AWS metadata server.
522
523        Args:
524            request (google.auth.transport.Request): A callable used to make
525                HTTP requests.
526            url (str): The AWS metadata server region URL.
527
528        Returns:
529            str: The current AWS region.
530
531        Raises:
532            google.auth.exceptions.RefreshError: If an error occurs while
533                retrieving the AWS region.
534        """
535        # The AWS metadata server is not available in some AWS environments
536        # such as AWS lambda. Instead, it is available via environment
537        # variable.
538        env_aws_region = os.environ.get(environment_vars.AWS_REGION)
539        if env_aws_region is not None:
540            return env_aws_region
541
542        env_aws_region = os.environ.get(environment_vars.AWS_DEFAULT_REGION)
543        if env_aws_region is not None:
544            return env_aws_region
545
546        if not self._region_url:
547            raise exceptions.RefreshError("Unable to determine AWS region")
548        response = request(url=self._region_url, method="GET")
549
550        # Support both string and bytes type response.data.
551        response_body = (
552            response.data.decode("utf-8")
553            if hasattr(response.data, "decode")
554            else response.data
555        )
556
557        if response.status != 200:
558            raise exceptions.RefreshError(
559                "Unable to retrieve AWS region", response_body
560            )
561
562        # This endpoint will return the region in format: us-east-2b.
563        # Only the us-east-2 part should be used.
564        return response_body[:-1]
565
566    def _get_security_credentials(self, request):
567        """Retrieves the AWS security credentials required for signing AWS
568        requests from either the AWS security credentials environment variables
569        or from the AWS metadata server.
570
571        Args:
572            request (google.auth.transport.Request): A callable used to make
573                HTTP requests.
574
575        Returns:
576            Mapping[str, str]: The AWS security credentials dictionary object.
577
578        Raises:
579            google.auth.exceptions.RefreshError: If an error occurs while
580                retrieving the AWS security credentials.
581        """
582
583        # Check environment variables for permanent credentials first.
584        # https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html
585        env_aws_access_key_id = os.environ.get(environment_vars.AWS_ACCESS_KEY_ID)
586        env_aws_secret_access_key = os.environ.get(
587            environment_vars.AWS_SECRET_ACCESS_KEY
588        )
589        # This is normally not available for permanent credentials.
590        env_aws_session_token = os.environ.get(environment_vars.AWS_SESSION_TOKEN)
591        if env_aws_access_key_id and env_aws_secret_access_key:
592            return {
593                "access_key_id": env_aws_access_key_id,
594                "secret_access_key": env_aws_secret_access_key,
595                "security_token": env_aws_session_token,
596            }
597
598        # Get role name.
599        role_name = self._get_metadata_role_name(request)
600
601        # Get security credentials.
602        credentials = self._get_metadata_security_credentials(request, role_name)
603
604        return {
605            "access_key_id": credentials.get("AccessKeyId"),
606            "secret_access_key": credentials.get("SecretAccessKey"),
607            "security_token": credentials.get("Token"),
608        }
609
610    def _get_metadata_security_credentials(self, request, role_name):
611        """Retrieves the AWS security credentials required for signing AWS
612        requests from the AWS metadata server.
613
614        Args:
615            request (google.auth.transport.Request): A callable used to make
616                HTTP requests.
617            role_name (str): The AWS role name required by the AWS metadata
618                server security_credentials endpoint in order to return the
619                credentials.
620
621        Returns:
622            Mapping[str, str]: The AWS metadata server security credentials
623                response.
624
625        Raises:
626            google.auth.exceptions.RefreshError: If an error occurs while
627                retrieving the AWS security credentials.
628        """
629        headers = {"Content-Type": "application/json"}
630        response = request(
631            url="{}/{}".format(self._security_credentials_url, role_name),
632            method="GET",
633            headers=headers,
634        )
635
636        # support both string and bytes type response.data
637        response_body = (
638            response.data.decode("utf-8")
639            if hasattr(response.data, "decode")
640            else response.data
641        )
642
643        if response.status != http_client.OK:
644            raise exceptions.RefreshError(
645                "Unable to retrieve AWS security credentials", response_body
646            )
647
648        credentials_response = json.loads(response_body)
649
650        return credentials_response
651
652    def _get_metadata_role_name(self, request):
653        """Retrieves the AWS role currently attached to the current AWS
654        workload by querying the AWS metadata server. This is needed for the
655        AWS metadata server security credentials endpoint in order to retrieve
656        the AWS security credentials needed to sign requests to AWS APIs.
657
658        Args:
659            request (google.auth.transport.Request): A callable used to make
660                HTTP requests.
661
662        Returns:
663            str: The AWS role name.
664
665        Raises:
666            google.auth.exceptions.RefreshError: If an error occurs while
667                retrieving the AWS role name.
668        """
669        if self._security_credentials_url is None:
670            raise exceptions.RefreshError(
671                "Unable to determine the AWS metadata server security credentials endpoint"
672            )
673        response = request(url=self._security_credentials_url, method="GET")
674
675        # support both string and bytes type response.data
676        response_body = (
677            response.data.decode("utf-8")
678            if hasattr(response.data, "decode")
679            else response.data
680        )
681
682        if response.status != http_client.OK:
683            raise exceptions.RefreshError(
684                "Unable to retrieve AWS role name", response_body
685            )
686
687        return response_body
688
689    @classmethod
690    def from_info(cls, info, **kwargs):
691        """Creates an AWS Credentials instance from parsed external account info.
692
693        Args:
694            info (Mapping[str, str]): The AWS external account info in Google
695                format.
696            kwargs: Additional arguments to pass to the constructor.
697
698        Returns:
699            google.auth.aws.Credentials: The constructed credentials.
700
701        Raises:
702            ValueError: For invalid parameters.
703        """
704        return cls(
705            audience=info.get("audience"),
706            subject_token_type=info.get("subject_token_type"),
707            token_url=info.get("token_url"),
708            service_account_impersonation_url=info.get(
709                "service_account_impersonation_url"
710            ),
711            client_id=info.get("client_id"),
712            client_secret=info.get("client_secret"),
713            credential_source=info.get("credential_source"),
714            quota_project_id=info.get("quota_project_id"),
715            **kwargs
716        )
717
718    @classmethod
719    def from_file(cls, filename, **kwargs):
720        """Creates an AWS Credentials instance from an external account json file.
721
722        Args:
723            filename (str): The path to the AWS external account json file.
724            kwargs: Additional arguments to pass to the constructor.
725
726        Returns:
727            google.auth.aws.Credentials: The constructed credentials.
728        """
729        with io.open(filename, "r", encoding="utf-8") as json_file:
730            data = json.load(json_file)
731            return cls.from_info(data, **kwargs)
732