1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Identity Pool Credentials.
16
17This module provides credentials to access Google Cloud resources from on-prem
18or non-Google Cloud platforms which support external credentials (e.g. OIDC ID
19tokens) retrieved from local file locations or local servers. This includes
20Microsoft Azure and OIDC identity providers (e.g. K8s workloads registered with
21Hub with Hub workload identity enabled).
22
23These credentials are recommended over the use of service account credentials
24in on-prem/non-Google Cloud platforms as they do not involve the management of
25long-live service account private keys.
26
27Identity Pool Credentials are initialized using external_account
28arguments which are typically loaded from an external credentials file or
29an external credentials URL. Unlike other Credentials that can be initialized
30with a list of explicit arguments, secrets or credentials, external account
31clients use the environment and hints/guidelines provided by the
32external_account JSON file to retrieve credentials and exchange them for Google
33access tokens.
34"""
35
36try:
37    from collections.abc import Mapping
38# Python 2.7 compatibility
39except ImportError:  # pragma: NO COVER
40    from collections import Mapping
41import io
42import json
43import os
44
45from google.auth import _helpers
46from google.auth import exceptions
47from google.auth import external_account
48
49
50class Credentials(external_account.Credentials):
51    """External account credentials sourced from files and URLs."""
52
53    def __init__(
54        self,
55        audience,
56        subject_token_type,
57        token_url,
58        credential_source,
59        service_account_impersonation_url=None,
60        client_id=None,
61        client_secret=None,
62        quota_project_id=None,
63        scopes=None,
64        default_scopes=None,
65        workforce_pool_user_project=None,
66    ):
67        """Instantiates an external account credentials object from a file/URL.
68
69        Args:
70            audience (str): The STS audience field.
71            subject_token_type (str): The subject token type.
72            token_url (str): The STS endpoint URL.
73            credential_source (Mapping): The credential source dictionary used to
74                provide instructions on how to retrieve external credential to be
75                exchanged for Google access tokens.
76
77                Example credential_source for url-sourced credential::
78
79                    {
80                        "url": "http://www.example.com",
81                        "format": {
82                            "type": "json",
83                            "subject_token_field_name": "access_token",
84                        },
85                        "headers": {"foo": "bar"},
86                    }
87
88                Example credential_source for file-sourced credential::
89
90                    {
91                        "file": "/path/to/token/file.txt"
92                    }
93
94            service_account_impersonation_url (Optional[str]): The optional service account
95                impersonation getAccessToken URL.
96            client_id (Optional[str]): The optional client ID.
97            client_secret (Optional[str]): The optional client secret.
98            quota_project_id (Optional[str]): The optional quota project ID.
99            scopes (Optional[Sequence[str]]): Optional scopes to request during the
100                authorization grant.
101            default_scopes (Optional[Sequence[str]]): Default scopes passed by a
102                Google client library. Use 'scopes' for user-defined scopes.
103            workforce_pool_user_project (Optona[str]): The optional workforce pool user
104                project number when the credential corresponds to a workforce pool and not
105                a workload identity pool. The underlying principal must still have
106                serviceusage.services.use IAM permission to use the project for
107                billing/quota.
108
109        Raises:
110            google.auth.exceptions.RefreshError: If an error is encountered during
111                access token retrieval logic.
112            ValueError: For invalid parameters.
113
114        .. note:: Typically one of the helper constructors
115            :meth:`from_file` or
116            :meth:`from_info` are used instead of calling the constructor directly.
117        """
118
119        super(Credentials, self).__init__(
120            audience=audience,
121            subject_token_type=subject_token_type,
122            token_url=token_url,
123            credential_source=credential_source,
124            service_account_impersonation_url=service_account_impersonation_url,
125            client_id=client_id,
126            client_secret=client_secret,
127            quota_project_id=quota_project_id,
128            scopes=scopes,
129            default_scopes=default_scopes,
130            workforce_pool_user_project=workforce_pool_user_project,
131        )
132        if not isinstance(credential_source, Mapping):
133            self._credential_source_file = None
134            self._credential_source_url = None
135        else:
136            self._credential_source_file = credential_source.get("file")
137            self._credential_source_url = credential_source.get("url")
138            self._credential_source_headers = credential_source.get("headers")
139            credential_source_format = credential_source.get("format", {})
140            # Get credential_source format type. When not provided, this
141            # defaults to text.
142            self._credential_source_format_type = (
143                credential_source_format.get("type") or "text"
144            )
145            # environment_id is only supported in AWS or dedicated future external
146            # account credentials.
147            if "environment_id" in credential_source:
148                raise ValueError(
149                    "Invalid Identity Pool credential_source field 'environment_id'"
150                )
151            if self._credential_source_format_type not in ["text", "json"]:
152                raise ValueError(
153                    "Invalid credential_source format '{}'".format(
154                        self._credential_source_format_type
155                    )
156                )
157            # For JSON types, get the required subject_token field name.
158            if self._credential_source_format_type == "json":
159                self._credential_source_field_name = credential_source_format.get(
160                    "subject_token_field_name"
161                )
162                if self._credential_source_field_name is None:
163                    raise ValueError(
164                        "Missing subject_token_field_name for JSON credential_source format"
165                    )
166            else:
167                self._credential_source_field_name = None
168
169        if self._credential_source_file and self._credential_source_url:
170            raise ValueError(
171                "Ambiguous credential_source. 'file' is mutually exclusive with 'url'."
172            )
173        if not self._credential_source_file and not self._credential_source_url:
174            raise ValueError(
175                "Missing credential_source. A 'file' or 'url' must be provided."
176            )
177
178    @_helpers.copy_docstring(external_account.Credentials)
179    def retrieve_subject_token(self, request):
180        return self._parse_token_data(
181            self._get_token_data(request),
182            self._credential_source_format_type,
183            self._credential_source_field_name,
184        )
185
186    def _get_token_data(self, request):
187        if self._credential_source_file:
188            return self._get_file_data(self._credential_source_file)
189        else:
190            return self._get_url_data(
191                request, self._credential_source_url, self._credential_source_headers
192            )
193
194    def _get_file_data(self, filename):
195        if not os.path.exists(filename):
196            raise exceptions.RefreshError("File '{}' was not found.".format(filename))
197
198        with io.open(filename, "r", encoding="utf-8") as file_obj:
199            return file_obj.read(), filename
200
201    def _get_url_data(self, request, url, headers):
202        response = request(url=url, method="GET", headers=headers)
203
204        # support both string and bytes type response.data
205        response_body = (
206            response.data.decode("utf-8")
207            if hasattr(response.data, "decode")
208            else response.data
209        )
210
211        if response.status != 200:
212            raise exceptions.RefreshError(
213                "Unable to retrieve Identity Pool subject token", response_body
214            )
215
216        return response_body, url
217
218    def _parse_token_data(
219        self, token_content, format_type="text", subject_token_field_name=None
220    ):
221        content, filename = token_content
222        if format_type == "text":
223            token = content
224        else:
225            try:
226                # Parse file content as JSON.
227                response_data = json.loads(content)
228                # Get the subject_token.
229                token = response_data[subject_token_field_name]
230            except (KeyError, ValueError):
231                raise exceptions.RefreshError(
232                    "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
233                        filename, subject_token_field_name
234                    )
235                )
236        if not token:
237            raise exceptions.RefreshError(
238                "Missing subject_token in the credential_source file"
239            )
240        return token
241
242    @classmethod
243    def from_info(cls, info, **kwargs):
244        """Creates an Identity Pool Credentials instance from parsed external account info.
245
246        Args:
247            info (Mapping[str, str]): The Identity Pool external account info in Google
248                format.
249            kwargs: Additional arguments to pass to the constructor.
250
251        Returns:
252            google.auth.identity_pool.Credentials: The constructed
253                credentials.
254
255        Raises:
256            ValueError: For invalid parameters.
257        """
258        return cls(
259            audience=info.get("audience"),
260            subject_token_type=info.get("subject_token_type"),
261            token_url=info.get("token_url"),
262            service_account_impersonation_url=info.get(
263                "service_account_impersonation_url"
264            ),
265            client_id=info.get("client_id"),
266            client_secret=info.get("client_secret"),
267            credential_source=info.get("credential_source"),
268            quota_project_id=info.get("quota_project_id"),
269            workforce_pool_user_project=info.get("workforce_pool_user_project"),
270            **kwargs
271        )
272
273    @classmethod
274    def from_file(cls, filename, **kwargs):
275        """Creates an IdentityPool Credentials instance from an external account json file.
276
277        Args:
278            filename (str): The path to the IdentityPool external account json file.
279            kwargs: Additional arguments to pass to the constructor.
280
281        Returns:
282            google.auth.identity_pool.Credentials: The constructed
283                credentials.
284        """
285        with io.open(filename, "r", encoding="utf-8") as json_file:
286            data = json.load(json_file)
287            return cls.from_info(data, **kwargs)
288