1# Copyright 2022 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15Run xDS Test Client on Kubernetes.
16"""
17import logging
18from typing import Optional
19
20from framework.infrastructure import gcp
21from framework.infrastructure import k8s
22from framework.test_app.client_app import XdsTestClient
23from framework.test_app.runners.k8s import k8s_base_runner
24
25logger = logging.getLogger(__name__)
26
27
28class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
29
30    # Required fields.
31    xds_server_uri: str
32    stats_port: int
33    deployment_template: str
34    enable_workload_identity: bool
35    debug_use_port_forwarding: bool
36    td_bootstrap_image: str
37    network: str
38
39    # Optional fields.
40    service_account_name: Optional[str] = None
41    service_account_template: Optional[str] = None
42    gcp_iam: Optional[gcp.iam.IamV1] = None
43
44    def __init__(  # pylint: disable=too-many-locals
45            self,
46            k8s_namespace: k8s.KubernetesNamespace,
47            *,
48            deployment_name: str,
49            image_name: str,
50            td_bootstrap_image: str,
51            network='default',
52            xds_server_uri: Optional[str] = None,
53            gcp_api_manager: gcp.api.GcpApiManager,
54            gcp_project: str,
55            gcp_service_account: str,
56            service_account_name: Optional[str] = None,
57            stats_port: int = 8079,
58            deployment_template: str = 'client.deployment.yaml',
59            service_account_template: str = 'service-account.yaml',
60            reuse_namespace: bool = False,
61            namespace_template: Optional[str] = None,
62            debug_use_port_forwarding: bool = False,
63            enable_workload_identity: bool = True):
64        super().__init__(k8s_namespace,
65                         deployment_name=deployment_name,
66                         image_name=image_name,
67                         gcp_project=gcp_project,
68                         gcp_service_account=gcp_service_account,
69                         gcp_ui_url=gcp_api_manager.gcp_ui_url,
70                         namespace_template=namespace_template,
71                         reuse_namespace=reuse_namespace)
72
73        # Settings
74        self.stats_port = stats_port
75        self.deployment_template = deployment_template
76        self.enable_workload_identity = enable_workload_identity
77        self.debug_use_port_forwarding = debug_use_port_forwarding
78
79        # Used by the TD bootstrap generator.
80        self.td_bootstrap_image = td_bootstrap_image
81        self.network = network
82        self.xds_server_uri = xds_server_uri
83
84        # Workload identity settings:
85        if self.enable_workload_identity:
86            # Kubernetes service account.
87            self.service_account_name = service_account_name or deployment_name
88            self.service_account_template = service_account_template
89            # GCP IAM API used to grant allow workload service accounts
90            # permission to use GCP service account identity.
91            self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project)
92
93    def run(  # pylint: disable=arguments-differ
94            self,
95            *,
96            server_target,
97            rpc='UnaryCall',
98            qps=25,
99            metadata='',
100            secure_mode=False,
101            config_mesh=None,
102            print_response=False,
103            log_to_stdout: bool = False) -> XdsTestClient:
104        logger.info(
105            'Deploying xDS test client "%s" to k8s namespace %s: '
106            'server_target=%s rpc=%s qps=%s metadata=%r secure_mode=%s '
107            'print_response=%s', self.deployment_name, self.k8s_namespace.name,
108            server_target, rpc, qps, metadata, secure_mode, print_response)
109        super().run()
110
111        if self.enable_workload_identity:
112            # Allow Kubernetes service account to use the GCP service account
113            # identity.
114            self._grant_workload_identity_user(
115                gcp_iam=self.gcp_iam,
116                gcp_service_account=self.gcp_service_account,
117                service_account_name=self.service_account_name)
118
119            # Create service account
120            self.service_account = self._create_service_account(
121                self.service_account_template,
122                service_account_name=self.service_account_name,
123                namespace_name=self.k8s_namespace.name,
124                gcp_service_account=self.gcp_service_account)
125
126        # Always create a new deployment
127        self.deployment = self._create_deployment(
128            self.deployment_template,
129            deployment_name=self.deployment_name,
130            image_name=self.image_name,
131            namespace_name=self.k8s_namespace.name,
132            service_account_name=self.service_account_name,
133            td_bootstrap_image=self.td_bootstrap_image,
134            xds_server_uri=self.xds_server_uri,
135            network=self.network,
136            stats_port=self.stats_port,
137            server_target=server_target,
138            rpc=rpc,
139            qps=qps,
140            metadata=metadata,
141            secure_mode=secure_mode,
142            config_mesh=config_mesh,
143            print_response=print_response)
144
145        # Load test client pod. We need only one client at the moment
146        pod_name = self._wait_deployment_pod_count(self.deployment)[0]
147        pod: k8s.V1Pod = self._wait_pod_started(pod_name)
148        if self.should_collect_logs:
149            self._start_logging_pod(pod, log_to_stdout=log_to_stdout)
150
151        # Verify the deployment reports all pods started as well.
152        self._wait_deployment_with_available_replicas(self.deployment_name)
153        self._start_completed()
154
155        return self._xds_test_client_for_pod(pod, server_target=server_target)
156
157    def _xds_test_client_for_pod(self, pod: k8s.V1Pod, *,
158                                 server_target: str) -> XdsTestClient:
159        if self.debug_use_port_forwarding:
160            pf = self._start_port_forwarding_pod(pod, self.stats_port)
161            rpc_port, rpc_host = pf.local_port, pf.local_address
162        else:
163            rpc_port, rpc_host = self.stats_port, None
164
165        return XdsTestClient(ip=pod.status.pod_ip,
166                             rpc_port=rpc_port,
167                             server_target=server_target,
168                             hostname=pod.metadata.name,
169                             rpc_host=rpc_host)
170
171    # pylint: disable=arguments-differ
172    def cleanup(self, *, force=False, force_namespace=False):
173        # TODO(sergiitk): rename to stop().
174        try:
175            if self.deployment or force:
176                self._delete_deployment(self.deployment_name)
177                self.deployment = None
178            if (self.enable_workload_identity and
179                (self.service_account or force)):
180                self._revoke_workload_identity_user(
181                    gcp_iam=self.gcp_iam,
182                    gcp_service_account=self.gcp_service_account,
183                    service_account_name=self.service_account_name)
184                self._delete_service_account(self.service_account_name)
185                self.service_account = None
186            self._cleanup_namespace(force=force_namespace and force)
187        finally:
188            self._stop()
189
190    # pylint: enable=arguments-differ
191
192    @classmethod
193    def make_namespace_name(cls,
194                            resource_prefix: str,
195                            resource_suffix: str,
196                            name: str = 'client') -> str:
197        """A helper to make consistent XdsTestClient kubernetes namespace name
198        for given resource prefix and suffix.
199
200        Note: the idea is to intentionally produce different namespace name for
201        the test server, and the test client, as that closely mimics real-world
202        deployments.
203        """
204        return cls._make_namespace_name(resource_prefix, resource_suffix, name)
205