1# Copyright 2022 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14""" 15Run xDS Test Client on Kubernetes. 16""" 17import logging 18from typing import Optional 19 20from framework.infrastructure import gcp 21from framework.infrastructure import k8s 22from framework.test_app.client_app import XdsTestClient 23from framework.test_app.runners.k8s import k8s_base_runner 24 25logger = logging.getLogger(__name__) 26 27 28class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): 29 30 # Required fields. 31 xds_server_uri: str 32 stats_port: int 33 deployment_template: str 34 enable_workload_identity: bool 35 debug_use_port_forwarding: bool 36 td_bootstrap_image: str 37 network: str 38 39 # Optional fields. 40 service_account_name: Optional[str] = None 41 service_account_template: Optional[str] = None 42 gcp_iam: Optional[gcp.iam.IamV1] = None 43 44 def __init__( # pylint: disable=too-many-locals 45 self, 46 k8s_namespace: k8s.KubernetesNamespace, 47 *, 48 deployment_name: str, 49 image_name: str, 50 td_bootstrap_image: str, 51 network='default', 52 xds_server_uri: Optional[str] = None, 53 gcp_api_manager: gcp.api.GcpApiManager, 54 gcp_project: str, 55 gcp_service_account: str, 56 service_account_name: Optional[str] = None, 57 stats_port: int = 8079, 58 deployment_template: str = 'client.deployment.yaml', 59 service_account_template: str = 'service-account.yaml', 60 reuse_namespace: bool = False, 61 namespace_template: Optional[str] = None, 62 debug_use_port_forwarding: bool = False, 63 enable_workload_identity: bool = True): 64 super().__init__(k8s_namespace, 65 deployment_name=deployment_name, 66 image_name=image_name, 67 gcp_project=gcp_project, 68 gcp_service_account=gcp_service_account, 69 gcp_ui_url=gcp_api_manager.gcp_ui_url, 70 namespace_template=namespace_template, 71 reuse_namespace=reuse_namespace) 72 73 # Settings 74 self.stats_port = stats_port 75 self.deployment_template = deployment_template 76 self.enable_workload_identity = enable_workload_identity 77 self.debug_use_port_forwarding = debug_use_port_forwarding 78 79 # Used by the TD bootstrap generator. 80 self.td_bootstrap_image = td_bootstrap_image 81 self.network = network 82 self.xds_server_uri = xds_server_uri 83 84 # Workload identity settings: 85 if self.enable_workload_identity: 86 # Kubernetes service account. 87 self.service_account_name = service_account_name or deployment_name 88 self.service_account_template = service_account_template 89 # GCP IAM API used to grant allow workload service accounts 90 # permission to use GCP service account identity. 91 self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project) 92 93 def run( # pylint: disable=arguments-differ 94 self, 95 *, 96 server_target, 97 rpc='UnaryCall', 98 qps=25, 99 metadata='', 100 secure_mode=False, 101 config_mesh=None, 102 print_response=False, 103 log_to_stdout: bool = False) -> XdsTestClient: 104 logger.info( 105 'Deploying xDS test client "%s" to k8s namespace %s: ' 106 'server_target=%s rpc=%s qps=%s metadata=%r secure_mode=%s ' 107 'print_response=%s', self.deployment_name, self.k8s_namespace.name, 108 server_target, rpc, qps, metadata, secure_mode, print_response) 109 super().run() 110 111 if self.enable_workload_identity: 112 # Allow Kubernetes service account to use the GCP service account 113 # identity. 114 self._grant_workload_identity_user( 115 gcp_iam=self.gcp_iam, 116 gcp_service_account=self.gcp_service_account, 117 service_account_name=self.service_account_name) 118 119 # Create service account 120 self.service_account = self._create_service_account( 121 self.service_account_template, 122 service_account_name=self.service_account_name, 123 namespace_name=self.k8s_namespace.name, 124 gcp_service_account=self.gcp_service_account) 125 126 # Always create a new deployment 127 self.deployment = self._create_deployment( 128 self.deployment_template, 129 deployment_name=self.deployment_name, 130 image_name=self.image_name, 131 namespace_name=self.k8s_namespace.name, 132 service_account_name=self.service_account_name, 133 td_bootstrap_image=self.td_bootstrap_image, 134 xds_server_uri=self.xds_server_uri, 135 network=self.network, 136 stats_port=self.stats_port, 137 server_target=server_target, 138 rpc=rpc, 139 qps=qps, 140 metadata=metadata, 141 secure_mode=secure_mode, 142 config_mesh=config_mesh, 143 print_response=print_response) 144 145 # Load test client pod. We need only one client at the moment 146 pod_name = self._wait_deployment_pod_count(self.deployment)[0] 147 pod: k8s.V1Pod = self._wait_pod_started(pod_name) 148 if self.should_collect_logs: 149 self._start_logging_pod(pod, log_to_stdout=log_to_stdout) 150 151 # Verify the deployment reports all pods started as well. 152 self._wait_deployment_with_available_replicas(self.deployment_name) 153 self._start_completed() 154 155 return self._xds_test_client_for_pod(pod, server_target=server_target) 156 157 def _xds_test_client_for_pod(self, pod: k8s.V1Pod, *, 158 server_target: str) -> XdsTestClient: 159 if self.debug_use_port_forwarding: 160 pf = self._start_port_forwarding_pod(pod, self.stats_port) 161 rpc_port, rpc_host = pf.local_port, pf.local_address 162 else: 163 rpc_port, rpc_host = self.stats_port, None 164 165 return XdsTestClient(ip=pod.status.pod_ip, 166 rpc_port=rpc_port, 167 server_target=server_target, 168 hostname=pod.metadata.name, 169 rpc_host=rpc_host) 170 171 # pylint: disable=arguments-differ 172 def cleanup(self, *, force=False, force_namespace=False): 173 # TODO(sergiitk): rename to stop(). 174 try: 175 if self.deployment or force: 176 self._delete_deployment(self.deployment_name) 177 self.deployment = None 178 if (self.enable_workload_identity and 179 (self.service_account or force)): 180 self._revoke_workload_identity_user( 181 gcp_iam=self.gcp_iam, 182 gcp_service_account=self.gcp_service_account, 183 service_account_name=self.service_account_name) 184 self._delete_service_account(self.service_account_name) 185 self.service_account = None 186 self._cleanup_namespace(force=force_namespace and force) 187 finally: 188 self._stop() 189 190 # pylint: enable=arguments-differ 191 192 @classmethod 193 def make_namespace_name(cls, 194 resource_prefix: str, 195 resource_suffix: str, 196 name: str = 'client') -> str: 197 """A helper to make consistent XdsTestClient kubernetes namespace name 198 for given resource prefix and suffix. 199 200 Note: the idea is to intentionally produce different namespace name for 201 the test server, and the test client, as that closely mimics real-world 202 deployments. 203 """ 204 return cls._make_namespace_name(resource_prefix, resource_suffix, name) 205