1# Copyright 2021 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import dataclasses 15import datetime 16import functools 17import logging 18from typing import Any, Dict, FrozenSet, Optional 19 20from framework.helpers import retryers 21from framework.infrastructure import gcp 22 23logger = logging.getLogger(__name__) 24 25# Type aliases 26_timedelta = datetime.timedelta 27_HttpRequest = gcp.api.HttpRequest 28 29 30class EtagConflict(gcp.api.Error): 31 """ 32 Indicates concurrent policy changes. 33 34 https://cloud.google.com/iam/docs/policies#etag 35 """ 36 37 38def handle_etag_conflict(func): 39 40 def wrap_retry_on_etag_conflict(*args, **kwargs): 41 retryer = retryers.exponential_retryer_with_timeout( 42 retry_on_exceptions=(EtagConflict, gcp.api.TransportError), 43 wait_min=_timedelta(seconds=1), 44 wait_max=_timedelta(seconds=10), 45 timeout=_timedelta(minutes=2)) 46 return retryer(func, *args, **kwargs) 47 48 return wrap_retry_on_etag_conflict 49 50 51def _replace_binding(policy: 'Policy', binding: 'Policy.Binding', 52 new_binding: 'Policy.Binding') -> 'Policy': 53 new_bindings = set(policy.bindings) 54 new_bindings.discard(binding) 55 new_bindings.add(new_binding) 56 return dataclasses.replace(policy, bindings=frozenset(new_bindings)) # pylint: disable=too-many-function-args 57 58 59@dataclasses.dataclass(frozen=True) 60class ServiceAccount: 61 """An IAM service account. 62 63 https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts 64 Note: "etag" field is skipped because it's deprecated 65 """ 66 name: str 67 projectId: str 68 uniqueId: str 69 email: str 70 oauth2ClientId: str 71 displayName: str = '' 72 description: str = '' 73 disabled: bool = False 74 75 @classmethod 76 def from_response(cls, response: Dict[str, Any]) -> 'ServiceAccount': 77 return cls(name=response['name'], 78 projectId=response['projectId'], 79 uniqueId=response['uniqueId'], 80 email=response['email'], 81 oauth2ClientId=response['oauth2ClientId'], 82 description=response.get('description', ''), 83 displayName=response.get('displayName', ''), 84 disabled=response.get('disabled', False)) 85 86 def as_dict(self) -> Dict[str, Any]: 87 return dataclasses.asdict(self) 88 89 90@dataclasses.dataclass(frozen=True) 91class Expr: 92 """ 93 Represents a textual expression in the Common Expression Language syntax. 94 95 https://cloud.google.com/iam/docs/reference/rest/v1/Expr 96 """ 97 expression: str 98 title: str = '' 99 description: str = '' 100 location: str = '' 101 102 @classmethod 103 def from_response(cls, response: Dict[str, Any]) -> 'Expr': 104 return cls(**response) 105 106 def as_dict(self) -> Dict[str, Any]: 107 return dataclasses.asdict(self) 108 109 110@dataclasses.dataclass(frozen=True) 111class Policy: 112 """An Identity and Access Management (IAM) policy, which specifies 113 access controls for Google Cloud resources. 114 115 https://cloud.google.com/iam/docs/reference/rest/v1/Policy 116 Note: auditConfigs not supported by this implementation. 117 """ 118 119 @dataclasses.dataclass(frozen=True) 120 class Binding: 121 """Policy Binding. Associates members with a role. 122 123 https://cloud.google.com/iam/docs/reference/rest/v1/Policy#binding 124 """ 125 role: str 126 members: FrozenSet[str] 127 condition: Optional[Expr] = None 128 129 @classmethod 130 def from_response(cls, response: Dict[str, Any]) -> 'Policy.Binding': 131 fields = { 132 'role': response['role'], 133 'members': frozenset(response.get('members', [])), 134 } 135 if 'condition' in response: 136 fields['condition'] = Expr.from_response(response['condition']) 137 138 return cls(**fields) 139 140 def as_dict(self) -> Dict[str, Any]: 141 result = { 142 'role': self.role, 143 'members': list(self.members), 144 } 145 if self.condition is not None: 146 result['condition'] = self.condition.as_dict() 147 return result 148 149 bindings: FrozenSet[Binding] 150 etag: str 151 version: Optional[int] = None 152 153 @functools.lru_cache(maxsize=128) 154 def find_binding_for_role( 155 self, 156 role: str, 157 condition: Optional[Expr] = None) -> Optional['Policy.Binding']: 158 results = (binding for binding in self.bindings 159 if binding.role == role and binding.condition == condition) 160 return next(results, None) 161 162 @classmethod 163 def from_response(cls, response: Dict[str, Any]) -> 'Policy': 164 bindings = frozenset( 165 cls.Binding.from_response(b) for b in response.get('bindings', [])) 166 return cls(bindings=bindings, 167 etag=response['etag'], 168 version=response.get('version')) 169 170 def as_dict(self) -> Dict[str, Any]: 171 result = { 172 'bindings': [binding.as_dict() for binding in self.bindings], 173 'etag': self.etag, 174 } 175 if self.version is not None: 176 result['version'] = self.version 177 return result 178 179 180class IamV1(gcp.api.GcpProjectApiResource): 181 """ 182 Identity and Access Management (IAM) API. 183 184 https://cloud.google.com/iam/docs/reference/rest 185 """ 186 _service_accounts: gcp.api.discovery.Resource 187 188 # Operations that affect conditional role bindings must specify version 3. 189 # Otherwise conditions are omitted, and role names returned with a suffix, 190 # f.e. roles/iam.workloadIdentityUser_withcond_f1ec33c9beb41857dbf0 191 # https://cloud.google.com/iam/docs/reference/rest/v1/Policy#FIELDS.version 192 POLICY_VERSION: int = 3 193 194 def __init__(self, api_manager: gcp.api.GcpApiManager, project: str): 195 super().__init__(api_manager.iam('v1'), project) 196 # Shortcut to projects/*/serviceAccounts/ endpoints 197 self._service_accounts = self.api.projects().serviceAccounts() 198 199 def service_account_resource_name(self, account) -> str: 200 """ 201 Returns full resource name of the service account. 202 203 The resource name of the service account in the following format: 204 projects/{PROJECT_ID}/serviceAccounts/{ACCOUNT}. 205 The ACCOUNT value can be the email address or the uniqueId of the 206 service account. 207 Ref https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/get 208 209 Args: 210 account: The ACCOUNT value 211 """ 212 return f'projects/{self.project}/serviceAccounts/{account}' 213 214 def get_service_account(self, account: str) -> ServiceAccount: 215 resource_name = self.service_account_resource_name(account) 216 request: _HttpRequest = self._service_accounts.get(name=resource_name) 217 response: Dict[str, Any] = self._execute(request) 218 logger.debug('Loaded Service Account:\n%s', 219 self.resource_pretty_format(response)) 220 return ServiceAccount.from_response(response) 221 222 def get_service_account_iam_policy(self, account: str) -> Policy: 223 resource_name = self.service_account_resource_name(account) 224 request: _HttpRequest = self._service_accounts.getIamPolicy( 225 resource=resource_name, 226 options_requestedPolicyVersion=self.POLICY_VERSION) 227 response: Dict[str, Any] = self._execute(request) 228 logger.debug('Loaded Service Account Policy:\n%s', 229 self.resource_pretty_format(response)) 230 return Policy.from_response(response) 231 232 def set_service_account_iam_policy(self, account: str, 233 policy: Policy) -> Policy: 234 """Sets the IAM policy that is attached to a service account. 235 236 https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/setIamPolicy 237 """ 238 resource_name = self.service_account_resource_name(account) 239 body = {'policy': policy.as_dict()} 240 logger.debug('Updating Service Account %s policy:\n%s', account, 241 self.resource_pretty_format(body)) 242 try: 243 request: _HttpRequest = self._service_accounts.setIamPolicy( 244 resource=resource_name, body=body) 245 response: Dict[str, Any] = self._execute(request) 246 return Policy.from_response(response) 247 except gcp.api.ResponseError as error: 248 if error.status == 409: 249 # https://cloud.google.com/iam/docs/policies#etag 250 logger.debug(error) 251 raise EtagConflict from error 252 raise 253 254 @handle_etag_conflict 255 def add_service_account_iam_policy_binding(self, account: str, role: str, 256 member: str) -> None: 257 """Add an IAM policy binding to an IAM service account. 258 259 See for details on updating policy bindings: 260 https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/setIamPolicy 261 """ 262 policy: Policy = self.get_service_account_iam_policy(account) 263 binding: Optional[Policy.Binding] = policy.find_binding_for_role(role) 264 if binding and member in binding.members: 265 logger.debug('Member %s already has role %s for Service Account %s', 266 member, role, account) 267 return 268 269 if binding is None: 270 updated_binding = Policy.Binding(role, frozenset([member])) 271 else: 272 updated_members: FrozenSet[str] = binding.members.union({member}) 273 updated_binding: Policy.Binding = dataclasses.replace( # pylint: disable=too-many-function-args 274 binding, 275 members=updated_members) 276 277 updated_policy: Policy = _replace_binding(policy, binding, 278 updated_binding) 279 self.set_service_account_iam_policy(account, updated_policy) 280 logger.debug('Role %s granted to member %s for Service Account %s', 281 role, member, account) 282 283 @handle_etag_conflict 284 def remove_service_account_iam_policy_binding(self, account: str, role: str, 285 member: str) -> None: 286 """Remove an IAM policy binding from the IAM policy of a service 287 account. 288 289 See for details on updating policy bindings: 290 https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/setIamPolicy 291 """ 292 policy: Policy = self.get_service_account_iam_policy(account) 293 binding: Optional[Policy.Binding] = policy.find_binding_for_role(role) 294 295 if binding is None: 296 logger.debug('Noop: Service Account %s has no bindings for role %s', 297 account, role) 298 return 299 if member not in binding.members: 300 logger.debug( 301 'Noop: Service Account %s binding for role %s has no member %s', 302 account, role, member) 303 return 304 305 updated_members: FrozenSet[str] = binding.members.difference({member}) 306 updated_binding: Policy.Binding = dataclasses.replace( # pylint: disable=too-many-function-args 307 binding, 308 members=updated_members) 309 updated_policy: Policy = _replace_binding(policy, binding, 310 updated_binding) 311 self.set_service_account_iam_policy(account, updated_policy) 312 logger.debug('Role %s revoked from member %s for Service Account %s', 313 role, member, account) 314