1# Copyright 2017 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Non-API-specific IAM policy definitions 15 16For allowed roles / permissions, see: 17https://cloud.google.com/iam/docs/understanding-roles 18 19Example usage: 20 21.. code-block:: python 22 23 # ``get_iam_policy`` returns a :class:'~google.api_core.iam.Policy`. 24 policy = resource.get_iam_policy(requested_policy_version=3) 25 26 phred = "user:[email protected]" 27 admin_group = "group:[email protected]" 28 account = "serviceAccount:[email protected]" 29 30 policy.version = 3 31 policy.bindings = [ 32 { 33 "role": "roles/owner", 34 "members": {phred, admin_group, account} 35 }, 36 { 37 "role": "roles/editor", 38 "members": {"allAuthenticatedUsers"} 39 }, 40 { 41 "role": "roles/viewer", 42 "members": {"allUsers"} 43 "condition": { 44 "title": "request_time", 45 "description": "Requests made before 2021-01-01T00:00:00Z", 46 "expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")" 47 } 48 } 49 ] 50 51 resource.set_iam_policy(policy) 52""" 53 54import collections 55import collections.abc 56import operator 57import warnings 58 59# Generic IAM roles 60 61OWNER_ROLE = "roles/owner" 62"""Generic role implying all rights to an object.""" 63 64EDITOR_ROLE = "roles/editor" 65"""Generic role implying rights to modify an object.""" 66 67VIEWER_ROLE = "roles/viewer" 68"""Generic role implying rights to access an object.""" 69 70_ASSIGNMENT_DEPRECATED_MSG = """\ 71Assigning to '{}' is deprecated. Use the `policy.bindings` property to modify bindings instead.""" 72 73_DICT_ACCESS_MSG = """\ 74Dict access is not supported on policies with version > 1 or with conditional bindings.""" 75 76 77class InvalidOperationException(Exception): 78 """Raised when trying to use Policy class as a dict.""" 79 80 pass 81 82 83class Policy(collections.abc.MutableMapping): 84 """IAM Policy 85 86 Args: 87 etag (Optional[str]): ETag used to identify a unique of the policy 88 version (Optional[int]): The syntax schema version of the policy. 89 90 Note: 91 Using conditions in bindings requires the policy's version to be set 92 to `3` or greater, depending on the versions that are currently supported. 93 94 Accessing the policy using dict operations will raise InvalidOperationException 95 when the policy's version is set to 3. 96 97 Use the policy.bindings getter/setter to retrieve and modify the policy's bindings. 98 99 See: 100 IAM Policy https://cloud.google.com/iam/reference/rest/v1/Policy 101 Policy versions https://cloud.google.com/iam/docs/policies#versions 102 Conditions overview https://cloud.google.com/iam/docs/conditions-overview. 103 """ 104 105 _OWNER_ROLES = (OWNER_ROLE,) 106 """Roles mapped onto our ``owners`` attribute.""" 107 108 _EDITOR_ROLES = (EDITOR_ROLE,) 109 """Roles mapped onto our ``editors`` attribute.""" 110 111 _VIEWER_ROLES = (VIEWER_ROLE,) 112 """Roles mapped onto our ``viewers`` attribute.""" 113 114 def __init__(self, etag=None, version=None): 115 self.etag = etag 116 self.version = version 117 self._bindings = [] 118 119 def __iter__(self): 120 self.__check_version__() 121 # Exclude bindings with no members 122 return (binding["role"] for binding in self._bindings if binding["members"]) 123 124 def __len__(self): 125 self.__check_version__() 126 # Exclude bindings with no members 127 return len(list(self.__iter__())) 128 129 def __getitem__(self, key): 130 self.__check_version__() 131 for b in self._bindings: 132 if b["role"] == key: 133 return b["members"] 134 # If the binding does not yet exist, create one 135 # NOTE: This will create bindings with no members 136 # which are ignored by __iter__ and __len__ 137 new_binding = {"role": key, "members": set()} 138 self._bindings.append(new_binding) 139 return new_binding["members"] 140 141 def __setitem__(self, key, value): 142 self.__check_version__() 143 value = set(value) 144 for binding in self._bindings: 145 if binding["role"] == key: 146 binding["members"] = value 147 return 148 self._bindings.append({"role": key, "members": value}) 149 150 def __delitem__(self, key): 151 self.__check_version__() 152 for b in self._bindings: 153 if b["role"] == key: 154 self._bindings.remove(b) 155 return 156 raise KeyError(key) 157 158 def __check_version__(self): 159 """Raise InvalidOperationException if version is greater than 1 or policy contains conditions.""" 160 raise_version = self.version is not None and self.version > 1 161 162 if raise_version or self._contains_conditions(): 163 raise InvalidOperationException(_DICT_ACCESS_MSG) 164 165 def _contains_conditions(self): 166 for b in self._bindings: 167 if b.get("condition") is not None: 168 return True 169 return False 170 171 @property 172 def bindings(self): 173 """The policy's list of bindings. 174 175 A binding is specified by a dictionary with keys: 176 177 * role (str): Role that is assigned to `members`. 178 179 * members (:obj:`set` of str): Specifies the identities associated to this binding. 180 181 * condition (:obj:`dict` of str:str): Specifies a condition under which this binding will apply. 182 183 * title (str): Title for the condition. 184 185 * description (:obj:str, optional): Description of the condition. 186 187 * expression: A CEL expression. 188 189 Type: 190 :obj:`list` of :obj:`dict` 191 192 See: 193 Policy versions https://cloud.google.com/iam/docs/policies#versions 194 Conditions overview https://cloud.google.com/iam/docs/conditions-overview. 195 196 Example: 197 198 .. code-block:: python 199 200 USER = "user:[email protected]" 201 ADMIN_GROUP = "group:[email protected]" 202 SERVICE_ACCOUNT = "serviceAccount:[email protected]" 203 CONDITION = { 204 "title": "request_time", 205 "description": "Requests made before 2021-01-01T00:00:00Z", # Optional 206 "expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")" 207 } 208 209 # Set policy's version to 3 before setting bindings containing conditions. 210 policy.version = 3 211 212 policy.bindings = [ 213 { 214 "role": "roles/viewer", 215 "members": {USER, ADMIN_GROUP, SERVICE_ACCOUNT}, 216 "condition": CONDITION 217 }, 218 ... 219 ] 220 """ 221 return self._bindings 222 223 @bindings.setter 224 def bindings(self, bindings): 225 self._bindings = bindings 226 227 @property 228 def owners(self): 229 """Legacy access to owner role. 230 231 Raise InvalidOperationException if version is greater than 1 or policy contains conditions. 232 233 DEPRECATED: use `policy.bindings` to access bindings instead. 234 """ 235 result = set() 236 for role in self._OWNER_ROLES: 237 for member in self.get(role, ()): 238 result.add(member) 239 return frozenset(result) 240 241 @owners.setter 242 def owners(self, value): 243 """Update owners. 244 245 Raise InvalidOperationException if version is greater than 1 or policy contains conditions. 246 247 DEPRECATED: use `policy.bindings` to access bindings instead. 248 """ 249 warnings.warn( 250 _ASSIGNMENT_DEPRECATED_MSG.format("owners", OWNER_ROLE), DeprecationWarning 251 ) 252 self[OWNER_ROLE] = value 253 254 @property 255 def editors(self): 256 """Legacy access to editor role. 257 258 Raise InvalidOperationException if version is greater than 1 or policy contains conditions. 259 260 DEPRECATED: use `policy.bindings` to access bindings instead. 261 """ 262 result = set() 263 for role in self._EDITOR_ROLES: 264 for member in self.get(role, ()): 265 result.add(member) 266 return frozenset(result) 267 268 @editors.setter 269 def editors(self, value): 270 """Update editors. 271 272 Raise InvalidOperationException if version is greater than 1 or policy contains conditions. 273 274 DEPRECATED: use `policy.bindings` to modify bindings instead. 275 """ 276 warnings.warn( 277 _ASSIGNMENT_DEPRECATED_MSG.format("editors", EDITOR_ROLE), 278 DeprecationWarning, 279 ) 280 self[EDITOR_ROLE] = value 281 282 @property 283 def viewers(self): 284 """Legacy access to viewer role. 285 286 Raise InvalidOperationException if version is greater than 1 or policy contains conditions. 287 288 DEPRECATED: use `policy.bindings` to modify bindings instead. 289 """ 290 result = set() 291 for role in self._VIEWER_ROLES: 292 for member in self.get(role, ()): 293 result.add(member) 294 return frozenset(result) 295 296 @viewers.setter 297 def viewers(self, value): 298 """Update viewers. 299 300 Raise InvalidOperationException if version is greater than 1 or policy contains conditions. 301 302 DEPRECATED: use `policy.bindings` to modify bindings instead. 303 """ 304 warnings.warn( 305 _ASSIGNMENT_DEPRECATED_MSG.format("viewers", VIEWER_ROLE), 306 DeprecationWarning, 307 ) 308 self[VIEWER_ROLE] = value 309 310 @staticmethod 311 def user(email): 312 """Factory method for a user member. 313 314 Args: 315 email (str): E-mail for this particular user. 316 317 Returns: 318 str: A member string corresponding to the given user. 319 """ 320 return "user:%s" % (email,) 321 322 @staticmethod 323 def service_account(email): 324 """Factory method for a service account member. 325 326 Args: 327 email (str): E-mail for this particular service account. 328 329 Returns: 330 str: A member string corresponding to the given service account. 331 332 """ 333 return "serviceAccount:%s" % (email,) 334 335 @staticmethod 336 def group(email): 337 """Factory method for a group member. 338 339 Args: 340 email (str): An id or e-mail for this particular group. 341 342 Returns: 343 str: A member string corresponding to the given group. 344 """ 345 return "group:%s" % (email,) 346 347 @staticmethod 348 def domain(domain): 349 """Factory method for a domain member. 350 351 Args: 352 domain (str): The domain for this member. 353 354 Returns: 355 str: A member string corresponding to the given domain. 356 """ 357 return "domain:%s" % (domain,) 358 359 @staticmethod 360 def all_users(): 361 """Factory method for a member representing all users. 362 363 Returns: 364 str: A member string representing all users. 365 """ 366 return "allUsers" 367 368 @staticmethod 369 def authenticated_users(): 370 """Factory method for a member representing all authenticated users. 371 372 Returns: 373 str: A member string representing all authenticated users. 374 """ 375 return "allAuthenticatedUsers" 376 377 @classmethod 378 def from_api_repr(cls, resource): 379 """Factory: create a policy from a JSON resource. 380 381 Args: 382 resource (dict): policy resource returned by ``getIamPolicy`` API. 383 384 Returns: 385 :class:`Policy`: the parsed policy 386 """ 387 version = resource.get("version") 388 etag = resource.get("etag") 389 policy = cls(etag, version) 390 policy.bindings = resource.get("bindings", []) 391 392 for binding in policy.bindings: 393 binding["members"] = set(binding.get("members", ())) 394 395 return policy 396 397 def to_api_repr(self): 398 """Render a JSON policy resource. 399 400 Returns: 401 dict: a resource to be passed to the ``setIamPolicy`` API. 402 """ 403 resource = {} 404 405 if self.etag is not None: 406 resource["etag"] = self.etag 407 408 if self.version is not None: 409 resource["version"] = self.version 410 411 if self._bindings and len(self._bindings) > 0: 412 bindings = [] 413 for binding in self._bindings: 414 members = binding.get("members") 415 if members: 416 new_binding = {"role": binding["role"], "members": sorted(members)} 417 condition = binding.get("condition") 418 if condition: 419 new_binding["condition"] = condition 420 bindings.append(new_binding) 421 422 if bindings: 423 # Sort bindings by role 424 key = operator.itemgetter("role") 425 resource["bindings"] = sorted(bindings, key=key) 426 427 return resource 428