1# Copyright 2022 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15Run xDS Test Client on Kubernetes.
16"""
17import logging
18from typing import List, Optional
19
20from framework.infrastructure import gcp
21from framework.infrastructure import k8s
22from framework.test_app.runners.k8s import k8s_base_runner
23from framework.test_app.server_app import XdsTestServer
24
25logger = logging.getLogger(__name__)
26
27
28class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
29    DEFAULT_TEST_PORT = 8080
30    DEFAULT_MAINTENANCE_PORT = 8080
31    DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081
32
33    # Required fields.
34    deployment_template: str
35    service_name: str
36    service_template: str
37    reuse_service: bool
38    enable_workload_identity: bool
39    debug_use_port_forwarding: bool
40    gcp_neg_name: str
41    td_bootstrap_image: str
42    xds_server_uri: str
43    network: str
44
45    # Optional fields.
46    service_account_name: Optional[str] = None
47    service_account_template: Optional[str] = None
48    gcp_iam: Optional[gcp.iam.IamV1] = None
49
50    # Mutable state.
51    service: Optional[k8s.V1Service] = None
52
53    def __init__(  # pylint: disable=too-many-locals
54            self,
55            k8s_namespace: k8s.KubernetesNamespace,
56            *,
57            deployment_name: str,
58            image_name: str,
59            td_bootstrap_image: str,
60            network: str = 'default',
61            xds_server_uri: Optional[str] = None,
62            gcp_api_manager: gcp.api.GcpApiManager,
63            gcp_project: str,
64            gcp_service_account: str,
65            service_account_name: Optional[str] = None,
66            service_name: Optional[str] = None,
67            neg_name: Optional[str] = None,
68            deployment_template: str = 'server.deployment.yaml',
69            service_account_template: str = 'service-account.yaml',
70            service_template: str = 'server.service.yaml',
71            reuse_service: bool = False,
72            reuse_namespace: bool = False,
73            namespace_template: Optional[str] = None,
74            debug_use_port_forwarding: bool = False,
75            enable_workload_identity: bool = True):
76        super().__init__(k8s_namespace,
77                         deployment_name=deployment_name,
78                         image_name=image_name,
79                         gcp_project=gcp_project,
80                         gcp_service_account=gcp_service_account,
81                         gcp_ui_url=gcp_api_manager.gcp_ui_url,
82                         namespace_template=namespace_template,
83                         reuse_namespace=reuse_namespace)
84
85        # Settings
86        self.deployment_template = deployment_template
87        self.service_name = service_name or deployment_name
88        self.service_template = service_template
89        self.reuse_service = reuse_service
90        self.enable_workload_identity = enable_workload_identity
91        self.debug_use_port_forwarding = debug_use_port_forwarding
92        # GCP Network Endpoint Group.
93        self.gcp_neg_name = neg_name or (f'{self.k8s_namespace.name}-'
94                                         f'{self.service_name}')
95
96        # Used by the TD bootstrap generator.
97        self.td_bootstrap_image = td_bootstrap_image
98        self.network = network
99        self.xds_server_uri = xds_server_uri
100
101        # Workload identity settings:
102        if self.enable_workload_identity:
103            # Kubernetes service account.
104            self.service_account_name = service_account_name or deployment_name
105            self.service_account_template = service_account_template
106            # GCP IAM API used to grant allow workload service accounts
107            # permission to use GCP service account identity.
108            self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project)
109
110    def run(  # pylint: disable=arguments-differ,too-many-branches
111            self,
112            *,
113            test_port: int = DEFAULT_TEST_PORT,
114            maintenance_port: Optional[int] = None,
115            secure_mode: bool = False,
116            replica_count: int = 1,
117            log_to_stdout: bool = False) -> List[XdsTestServer]:
118        if not maintenance_port:
119            maintenance_port = self._get_default_maintenance_port(secure_mode)
120
121        # Implementation detail: in secure mode, maintenance ("backchannel")
122        # port must be different from the test port so communication with
123        # maintenance services can be reached independently of the security
124        # configuration under test.
125        if secure_mode and maintenance_port == test_port:
126            raise ValueError('port and maintenance_port must be different '
127                             'when running test server in secure mode')
128        # To avoid bugs with comparing wrong types.
129        if not (isinstance(test_port, int) and
130                isinstance(maintenance_port, int)):
131            raise TypeError('Port numbers must be integer')
132
133        if secure_mode and not self.enable_workload_identity:
134            raise ValueError('Secure mode requires Workload Identity enabled.')
135
136        logger.info(
137            'Deploying xDS test server "%s" to k8s namespace %s: test_port=%s '
138            'maintenance_port=%s secure_mode=%s replica_count=%s',
139            self.deployment_name, self.k8s_namespace.name, test_port,
140            maintenance_port, secure_mode, replica_count)
141        super().run()
142
143        # Reuse existing if requested, create a new deployment when missing.
144        # Useful for debugging to avoid NEG loosing relation to deleted service.
145        if self.reuse_service:
146            self.service = self._reuse_service(self.service_name)
147        if not self.service:
148            self.service = self._create_service(
149                self.service_template,
150                service_name=self.service_name,
151                namespace_name=self.k8s_namespace.name,
152                deployment_name=self.deployment_name,
153                neg_name=self.gcp_neg_name,
154                test_port=test_port)
155        self._wait_service_neg(self.service_name, test_port)
156
157        if self.enable_workload_identity:
158            # Allow Kubernetes service account to use the GCP service account
159            # identity.
160            self._grant_workload_identity_user(
161                gcp_iam=self.gcp_iam,
162                gcp_service_account=self.gcp_service_account,
163                service_account_name=self.service_account_name)
164
165            # Create service account
166            self.service_account = self._create_service_account(
167                self.service_account_template,
168                service_account_name=self.service_account_name,
169                namespace_name=self.k8s_namespace.name,
170                gcp_service_account=self.gcp_service_account)
171
172        # Always create a new deployment
173        self.deployment = self._create_deployment(
174            self.deployment_template,
175            deployment_name=self.deployment_name,
176            image_name=self.image_name,
177            namespace_name=self.k8s_namespace.name,
178            service_account_name=self.service_account_name,
179            td_bootstrap_image=self.td_bootstrap_image,
180            xds_server_uri=self.xds_server_uri,
181            network=self.network,
182            replica_count=replica_count,
183            test_port=test_port,
184            maintenance_port=maintenance_port,
185            secure_mode=secure_mode)
186
187        pod_names = self._wait_deployment_pod_count(self.deployment,
188                                                    replica_count)
189        pods = []
190        for pod_name in pod_names:
191            pod = self._wait_pod_started(pod_name)
192            pods.append(pod)
193            if self.should_collect_logs:
194                self._start_logging_pod(pod, log_to_stdout=log_to_stdout)
195
196        # Verify the deployment reports all pods started as well.
197        self._wait_deployment_with_available_replicas(self.deployment_name,
198                                                      replica_count)
199        self._start_completed()
200
201        servers: List[XdsTestServer] = []
202        for pod in pods:
203            servers.append(
204                self._xds_test_server_for_pod(pod,
205                                              test_port=test_port,
206                                              maintenance_port=maintenance_port,
207                                              secure_mode=secure_mode))
208        return servers
209
210    def _get_default_maintenance_port(self, secure_mode: bool) -> int:
211        if not secure_mode:
212            maintenance_port = self.DEFAULT_MAINTENANCE_PORT
213        else:
214            maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
215        return maintenance_port
216
217    def _xds_test_server_for_pod(self,
218                                 pod: k8s.V1Pod,
219                                 *,
220                                 test_port: int = DEFAULT_TEST_PORT,
221                                 maintenance_port: Optional[int] = None,
222                                 secure_mode: bool = False) -> XdsTestServer:
223        if maintenance_port is None:
224            maintenance_port = self._get_default_maintenance_port(secure_mode)
225
226        if self.debug_use_port_forwarding:
227            pf = self._start_port_forwarding_pod(pod, maintenance_port)
228            rpc_port, rpc_host = pf.local_port, pf.local_address
229        else:
230            rpc_port, rpc_host = maintenance_port, None
231
232        return XdsTestServer(ip=pod.status.pod_ip,
233                             rpc_port=test_port,
234                             hostname=pod.metadata.name,
235                             maintenance_port=rpc_port,
236                             secure_mode=secure_mode,
237                             rpc_host=rpc_host)
238
239    # pylint: disable=arguments-differ
240    def cleanup(self, *, force=False, force_namespace=False):
241        # TODO(sergiitk): rename to stop().
242        try:
243            if self.deployment or force:
244                self._delete_deployment(self.deployment_name)
245                self.deployment = None
246            if (self.service and not self.reuse_service) or force:
247                self._delete_service(self.service_name)
248                self.service = None
249            if (self.enable_workload_identity and
250                (self.service_account or force)):
251                self._revoke_workload_identity_user(
252                    gcp_iam=self.gcp_iam,
253                    gcp_service_account=self.gcp_service_account,
254                    service_account_name=self.service_account_name)
255                self._delete_service_account(self.service_account_name)
256                self.service_account = None
257            self._cleanup_namespace(force=(force_namespace and force))
258        finally:
259            self._stop()
260
261    # pylint: enable=arguments-differ
262
263    @classmethod
264    def make_namespace_name(cls,
265                            resource_prefix: str,
266                            resource_suffix: str,
267                            name: str = 'server') -> str:
268        """A helper to make consistent XdsTestServer kubernetes namespace name
269        for given resource prefix and suffix.
270
271        Note: the idea is to intentionally produce different namespace name for
272        the test server, and the test client, as that closely mimics real-world
273        deployments.
274        """
275        return cls._make_namespace_name(resource_prefix, resource_suffix, name)
276