1# Copyright 2021 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import dataclasses
15import datetime
16import functools
17import logging
18from typing import Any, Dict, FrozenSet, Optional
19
20from framework.helpers import retryers
21from framework.infrastructure import gcp
22
23logger = logging.getLogger(__name__)
24
25# Type aliases
26_timedelta = datetime.timedelta
27_HttpRequest = gcp.api.HttpRequest
28
29
30class EtagConflict(gcp.api.Error):
31    """
32    Indicates concurrent policy changes.
33
34    https://cloud.google.com/iam/docs/policies#etag
35    """
36
37
38def handle_etag_conflict(func):
39
40    def wrap_retry_on_etag_conflict(*args, **kwargs):
41        retryer = retryers.exponential_retryer_with_timeout(
42            retry_on_exceptions=(EtagConflict, gcp.api.TransportError),
43            wait_min=_timedelta(seconds=1),
44            wait_max=_timedelta(seconds=10),
45            timeout=_timedelta(minutes=2))
46        return retryer(func, *args, **kwargs)
47
48    return wrap_retry_on_etag_conflict
49
50
51def _replace_binding(policy: 'Policy', binding: 'Policy.Binding',
52                     new_binding: 'Policy.Binding') -> 'Policy':
53    new_bindings = set(policy.bindings)
54    new_bindings.discard(binding)
55    new_bindings.add(new_binding)
56    return dataclasses.replace(policy, bindings=frozenset(new_bindings))  # pylint: disable=too-many-function-args
57
58
59@dataclasses.dataclass(frozen=True)
60class ServiceAccount:
61    """An IAM service account.
62
63    https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts
64    Note: "etag" field is skipped because it's deprecated
65    """
66    name: str
67    projectId: str
68    uniqueId: str
69    email: str
70    oauth2ClientId: str
71    displayName: str = ''
72    description: str = ''
73    disabled: bool = False
74
75    @classmethod
76    def from_response(cls, response: Dict[str, Any]) -> 'ServiceAccount':
77        return cls(name=response['name'],
78                   projectId=response['projectId'],
79                   uniqueId=response['uniqueId'],
80                   email=response['email'],
81                   oauth2ClientId=response['oauth2ClientId'],
82                   description=response.get('description', ''),
83                   displayName=response.get('displayName', ''),
84                   disabled=response.get('disabled', False))
85
86    def as_dict(self) -> Dict[str, Any]:
87        return dataclasses.asdict(self)
88
89
90@dataclasses.dataclass(frozen=True)
91class Expr:
92    """
93    Represents a textual expression in the Common Expression Language syntax.
94
95    https://cloud.google.com/iam/docs/reference/rest/v1/Expr
96    """
97    expression: str
98    title: str = ''
99    description: str = ''
100    location: str = ''
101
102    @classmethod
103    def from_response(cls, response: Dict[str, Any]) -> 'Expr':
104        return cls(**response)
105
106    def as_dict(self) -> Dict[str, Any]:
107        return dataclasses.asdict(self)
108
109
110@dataclasses.dataclass(frozen=True)
111class Policy:
112    """An Identity and Access Management (IAM) policy, which specifies
113    access controls for Google Cloud resources.
114
115    https://cloud.google.com/iam/docs/reference/rest/v1/Policy
116    Note: auditConfigs not supported by this implementation.
117    """
118
119    @dataclasses.dataclass(frozen=True)
120    class Binding:
121        """Policy Binding. Associates members with a role.
122
123        https://cloud.google.com/iam/docs/reference/rest/v1/Policy#binding
124        """
125        role: str
126        members: FrozenSet[str]
127        condition: Optional[Expr] = None
128
129        @classmethod
130        def from_response(cls, response: Dict[str, Any]) -> 'Policy.Binding':
131            fields = {
132                'role': response['role'],
133                'members': frozenset(response.get('members', [])),
134            }
135            if 'condition' in response:
136                fields['condition'] = Expr.from_response(response['condition'])
137
138            return cls(**fields)
139
140        def as_dict(self) -> Dict[str, Any]:
141            result = {
142                'role': self.role,
143                'members': list(self.members),
144            }
145            if self.condition is not None:
146                result['condition'] = self.condition.as_dict()
147            return result
148
149    bindings: FrozenSet[Binding]
150    etag: str
151    version: Optional[int] = None
152
153    @functools.lru_cache(maxsize=128)
154    def find_binding_for_role(
155            self,
156            role: str,
157            condition: Optional[Expr] = None) -> Optional['Policy.Binding']:
158        results = (binding for binding in self.bindings
159                   if binding.role == role and binding.condition == condition)
160        return next(results, None)
161
162    @classmethod
163    def from_response(cls, response: Dict[str, Any]) -> 'Policy':
164        bindings = frozenset(
165            cls.Binding.from_response(b) for b in response.get('bindings', []))
166        return cls(bindings=bindings,
167                   etag=response['etag'],
168                   version=response.get('version'))
169
170    def as_dict(self) -> Dict[str, Any]:
171        result = {
172            'bindings': [binding.as_dict() for binding in self.bindings],
173            'etag': self.etag,
174        }
175        if self.version is not None:
176            result['version'] = self.version
177        return result
178
179
180class IamV1(gcp.api.GcpProjectApiResource):
181    """
182    Identity and Access Management (IAM) API.
183
184    https://cloud.google.com/iam/docs/reference/rest
185    """
186    _service_accounts: gcp.api.discovery.Resource
187
188    # Operations that affect conditional role bindings must specify version 3.
189    # Otherwise conditions are omitted, and role names returned with a suffix,
190    # f.e. roles/iam.workloadIdentityUser_withcond_f1ec33c9beb41857dbf0
191    # https://cloud.google.com/iam/docs/reference/rest/v1/Policy#FIELDS.version
192    POLICY_VERSION: int = 3
193
194    def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):
195        super().__init__(api_manager.iam('v1'), project)
196        # Shortcut to projects/*/serviceAccounts/ endpoints
197        self._service_accounts = self.api.projects().serviceAccounts()
198
199    def service_account_resource_name(self, account) -> str:
200        """
201        Returns full resource name of the service account.
202
203        The resource name of the service account in the following format:
204        projects/{PROJECT_ID}/serviceAccounts/{ACCOUNT}.
205        The ACCOUNT value can be the email address or the uniqueId of the
206        service account.
207        Ref https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/get
208
209        Args:
210            account: The ACCOUNT value
211        """
212        return f'projects/{self.project}/serviceAccounts/{account}'
213
214    def get_service_account(self, account: str) -> ServiceAccount:
215        resource_name = self.service_account_resource_name(account)
216        request: _HttpRequest = self._service_accounts.get(name=resource_name)
217        response: Dict[str, Any] = self._execute(request)
218        logger.debug('Loaded Service Account:\n%s',
219                     self.resource_pretty_format(response))
220        return ServiceAccount.from_response(response)
221
222    def get_service_account_iam_policy(self, account: str) -> Policy:
223        resource_name = self.service_account_resource_name(account)
224        request: _HttpRequest = self._service_accounts.getIamPolicy(
225            resource=resource_name,
226            options_requestedPolicyVersion=self.POLICY_VERSION)
227        response: Dict[str, Any] = self._execute(request)
228        logger.debug('Loaded Service Account Policy:\n%s',
229                     self.resource_pretty_format(response))
230        return Policy.from_response(response)
231
232    def set_service_account_iam_policy(self, account: str,
233                                       policy: Policy) -> Policy:
234        """Sets the IAM policy that is attached to a service account.
235
236        https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/setIamPolicy
237        """
238        resource_name = self.service_account_resource_name(account)
239        body = {'policy': policy.as_dict()}
240        logger.debug('Updating Service Account %s policy:\n%s', account,
241                     self.resource_pretty_format(body))
242        try:
243            request: _HttpRequest = self._service_accounts.setIamPolicy(
244                resource=resource_name, body=body)
245            response: Dict[str, Any] = self._execute(request)
246            return Policy.from_response(response)
247        except gcp.api.ResponseError as error:
248            if error.status == 409:
249                # https://cloud.google.com/iam/docs/policies#etag
250                logger.debug(error)
251                raise EtagConflict from error
252            raise
253
254    @handle_etag_conflict
255    def add_service_account_iam_policy_binding(self, account: str, role: str,
256                                               member: str) -> None:
257        """Add an IAM policy binding to an IAM service account.
258
259        See for details on updating policy bindings:
260        https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/setIamPolicy
261        """
262        policy: Policy = self.get_service_account_iam_policy(account)
263        binding: Optional[Policy.Binding] = policy.find_binding_for_role(role)
264        if binding and member in binding.members:
265            logger.debug('Member %s already has role %s for Service Account %s',
266                         member, role, account)
267            return
268
269        if binding is None:
270            updated_binding = Policy.Binding(role, frozenset([member]))
271        else:
272            updated_members: FrozenSet[str] = binding.members.union({member})
273            updated_binding: Policy.Binding = dataclasses.replace(  # pylint: disable=too-many-function-args
274                binding,
275                members=updated_members)
276
277        updated_policy: Policy = _replace_binding(policy, binding,
278                                                  updated_binding)
279        self.set_service_account_iam_policy(account, updated_policy)
280        logger.debug('Role %s granted to member %s for Service Account %s',
281                     role, member, account)
282
283    @handle_etag_conflict
284    def remove_service_account_iam_policy_binding(self, account: str, role: str,
285                                                  member: str) -> None:
286        """Remove an IAM policy binding from the IAM policy of a service
287        account.
288
289        See for details on updating policy bindings:
290        https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/setIamPolicy
291        """
292        policy: Policy = self.get_service_account_iam_policy(account)
293        binding: Optional[Policy.Binding] = policy.find_binding_for_role(role)
294
295        if binding is None:
296            logger.debug('Noop: Service Account %s has no bindings for role %s',
297                         account, role)
298            return
299        if member not in binding.members:
300            logger.debug(
301                'Noop: Service Account %s binding for role %s has no member %s',
302                account, role, member)
303            return
304
305        updated_members: FrozenSet[str] = binding.members.difference({member})
306        updated_binding: Policy.Binding = dataclasses.replace(  # pylint: disable=too-many-function-args
307            binding,
308            members=updated_members)
309        updated_policy: Policy = _replace_binding(policy, binding,
310                                                  updated_binding)
311        self.set_service_account_iam_policy(account, updated_policy)
312        logger.debug('Role %s revoked from member %s for Service Account %s',
313                     role, member, account)
314