1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Identity Pool Credentials. 16 17This module provides credentials to access Google Cloud resources from on-prem 18or non-Google Cloud platforms which support external credentials (e.g. OIDC ID 19tokens) retrieved from local file locations or local servers. This includes 20Microsoft Azure and OIDC identity providers (e.g. K8s workloads registered with 21Hub with Hub workload identity enabled). 22 23These credentials are recommended over the use of service account credentials 24in on-prem/non-Google Cloud platforms as they do not involve the management of 25long-live service account private keys. 26 27Identity Pool Credentials are initialized using external_account 28arguments which are typically loaded from an external credentials file or 29an external credentials URL. Unlike other Credentials that can be initialized 30with a list of explicit arguments, secrets or credentials, external account 31clients use the environment and hints/guidelines provided by the 32external_account JSON file to retrieve credentials and exchange them for Google 33access tokens. 34""" 35 36try: 37 from collections.abc import Mapping 38# Python 2.7 compatibility 39except ImportError: # pragma: NO COVER 40 from collections import Mapping 41import io 42import json 43import os 44 45from google.auth import _helpers 46from google.auth import exceptions 47from google.auth import external_account 48 49 50class Credentials(external_account.Credentials): 51 """External account credentials sourced from files and URLs.""" 52 53 def __init__( 54 self, 55 audience, 56 subject_token_type, 57 token_url, 58 credential_source, 59 service_account_impersonation_url=None, 60 client_id=None, 61 client_secret=None, 62 quota_project_id=None, 63 scopes=None, 64 default_scopes=None, 65 workforce_pool_user_project=None, 66 ): 67 """Instantiates an external account credentials object from a file/URL. 68 69 Args: 70 audience (str): The STS audience field. 71 subject_token_type (str): The subject token type. 72 token_url (str): The STS endpoint URL. 73 credential_source (Mapping): The credential source dictionary used to 74 provide instructions on how to retrieve external credential to be 75 exchanged for Google access tokens. 76 77 Example credential_source for url-sourced credential:: 78 79 { 80 "url": "http://www.example.com", 81 "format": { 82 "type": "json", 83 "subject_token_field_name": "access_token", 84 }, 85 "headers": {"foo": "bar"}, 86 } 87 88 Example credential_source for file-sourced credential:: 89 90 { 91 "file": "/path/to/token/file.txt" 92 } 93 94 service_account_impersonation_url (Optional[str]): The optional service account 95 impersonation getAccessToken URL. 96 client_id (Optional[str]): The optional client ID. 97 client_secret (Optional[str]): The optional client secret. 98 quota_project_id (Optional[str]): The optional quota project ID. 99 scopes (Optional[Sequence[str]]): Optional scopes to request during the 100 authorization grant. 101 default_scopes (Optional[Sequence[str]]): Default scopes passed by a 102 Google client library. Use 'scopes' for user-defined scopes. 103 workforce_pool_user_project (Optona[str]): The optional workforce pool user 104 project number when the credential corresponds to a workforce pool and not 105 a workload identity pool. The underlying principal must still have 106 serviceusage.services.use IAM permission to use the project for 107 billing/quota. 108 109 Raises: 110 google.auth.exceptions.RefreshError: If an error is encountered during 111 access token retrieval logic. 112 ValueError: For invalid parameters. 113 114 .. note:: Typically one of the helper constructors 115 :meth:`from_file` or 116 :meth:`from_info` are used instead of calling the constructor directly. 117 """ 118 119 super(Credentials, self).__init__( 120 audience=audience, 121 subject_token_type=subject_token_type, 122 token_url=token_url, 123 credential_source=credential_source, 124 service_account_impersonation_url=service_account_impersonation_url, 125 client_id=client_id, 126 client_secret=client_secret, 127 quota_project_id=quota_project_id, 128 scopes=scopes, 129 default_scopes=default_scopes, 130 workforce_pool_user_project=workforce_pool_user_project, 131 ) 132 if not isinstance(credential_source, Mapping): 133 self._credential_source_file = None 134 self._credential_source_url = None 135 else: 136 self._credential_source_file = credential_source.get("file") 137 self._credential_source_url = credential_source.get("url") 138 self._credential_source_headers = credential_source.get("headers") 139 credential_source_format = credential_source.get("format", {}) 140 # Get credential_source format type. When not provided, this 141 # defaults to text. 142 self._credential_source_format_type = ( 143 credential_source_format.get("type") or "text" 144 ) 145 # environment_id is only supported in AWS or dedicated future external 146 # account credentials. 147 if "environment_id" in credential_source: 148 raise ValueError( 149 "Invalid Identity Pool credential_source field 'environment_id'" 150 ) 151 if self._credential_source_format_type not in ["text", "json"]: 152 raise ValueError( 153 "Invalid credential_source format '{}'".format( 154 self._credential_source_format_type 155 ) 156 ) 157 # For JSON types, get the required subject_token field name. 158 if self._credential_source_format_type == "json": 159 self._credential_source_field_name = credential_source_format.get( 160 "subject_token_field_name" 161 ) 162 if self._credential_source_field_name is None: 163 raise ValueError( 164 "Missing subject_token_field_name for JSON credential_source format" 165 ) 166 else: 167 self._credential_source_field_name = None 168 169 if self._credential_source_file and self._credential_source_url: 170 raise ValueError( 171 "Ambiguous credential_source. 'file' is mutually exclusive with 'url'." 172 ) 173 if not self._credential_source_file and not self._credential_source_url: 174 raise ValueError( 175 "Missing credential_source. A 'file' or 'url' must be provided." 176 ) 177 178 @_helpers.copy_docstring(external_account.Credentials) 179 def retrieve_subject_token(self, request): 180 return self._parse_token_data( 181 self._get_token_data(request), 182 self._credential_source_format_type, 183 self._credential_source_field_name, 184 ) 185 186 def _get_token_data(self, request): 187 if self._credential_source_file: 188 return self._get_file_data(self._credential_source_file) 189 else: 190 return self._get_url_data( 191 request, self._credential_source_url, self._credential_source_headers 192 ) 193 194 def _get_file_data(self, filename): 195 if not os.path.exists(filename): 196 raise exceptions.RefreshError("File '{}' was not found.".format(filename)) 197 198 with io.open(filename, "r", encoding="utf-8") as file_obj: 199 return file_obj.read(), filename 200 201 def _get_url_data(self, request, url, headers): 202 response = request(url=url, method="GET", headers=headers) 203 204 # support both string and bytes type response.data 205 response_body = ( 206 response.data.decode("utf-8") 207 if hasattr(response.data, "decode") 208 else response.data 209 ) 210 211 if response.status != 200: 212 raise exceptions.RefreshError( 213 "Unable to retrieve Identity Pool subject token", response_body 214 ) 215 216 return response_body, url 217 218 def _parse_token_data( 219 self, token_content, format_type="text", subject_token_field_name=None 220 ): 221 content, filename = token_content 222 if format_type == "text": 223 token = content 224 else: 225 try: 226 # Parse file content as JSON. 227 response_data = json.loads(content) 228 # Get the subject_token. 229 token = response_data[subject_token_field_name] 230 except (KeyError, ValueError): 231 raise exceptions.RefreshError( 232 "Unable to parse subject_token from JSON file '{}' using key '{}'".format( 233 filename, subject_token_field_name 234 ) 235 ) 236 if not token: 237 raise exceptions.RefreshError( 238 "Missing subject_token in the credential_source file" 239 ) 240 return token 241 242 @classmethod 243 def from_info(cls, info, **kwargs): 244 """Creates an Identity Pool Credentials instance from parsed external account info. 245 246 Args: 247 info (Mapping[str, str]): The Identity Pool external account info in Google 248 format. 249 kwargs: Additional arguments to pass to the constructor. 250 251 Returns: 252 google.auth.identity_pool.Credentials: The constructed 253 credentials. 254 255 Raises: 256 ValueError: For invalid parameters. 257 """ 258 return cls( 259 audience=info.get("audience"), 260 subject_token_type=info.get("subject_token_type"), 261 token_url=info.get("token_url"), 262 service_account_impersonation_url=info.get( 263 "service_account_impersonation_url" 264 ), 265 client_id=info.get("client_id"), 266 client_secret=info.get("client_secret"), 267 credential_source=info.get("credential_source"), 268 quota_project_id=info.get("quota_project_id"), 269 workforce_pool_user_project=info.get("workforce_pool_user_project"), 270 **kwargs 271 ) 272 273 @classmethod 274 def from_file(cls, filename, **kwargs): 275 """Creates an IdentityPool Credentials instance from an external account json file. 276 277 Args: 278 filename (str): The path to the IdentityPool external account json file. 279 kwargs: Additional arguments to pass to the constructor. 280 281 Returns: 282 google.auth.identity_pool.Credentials: The constructed 283 credentials. 284 """ 285 with io.open(filename, "r", encoding="utf-8") as json_file: 286 data = json.load(json_file) 287 return cls.from_info(data, **kwargs) 288