1# Copyright 2022 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14""" 15Run xDS Test Client on Kubernetes. 16""" 17import logging 18from typing import List, Optional 19 20from framework.infrastructure import gcp 21from framework.infrastructure import k8s 22from framework.test_app.runners.k8s import k8s_base_runner 23from framework.test_app.server_app import XdsTestServer 24 25logger = logging.getLogger(__name__) 26 27 28class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): 29 DEFAULT_TEST_PORT = 8080 30 DEFAULT_MAINTENANCE_PORT = 8080 31 DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081 32 33 # Required fields. 34 deployment_template: str 35 service_name: str 36 service_template: str 37 reuse_service: bool 38 enable_workload_identity: bool 39 debug_use_port_forwarding: bool 40 gcp_neg_name: str 41 td_bootstrap_image: str 42 xds_server_uri: str 43 network: str 44 45 # Optional fields. 46 service_account_name: Optional[str] = None 47 service_account_template: Optional[str] = None 48 gcp_iam: Optional[gcp.iam.IamV1] = None 49 50 # Mutable state. 51 service: Optional[k8s.V1Service] = None 52 53 def __init__( # pylint: disable=too-many-locals 54 self, 55 k8s_namespace: k8s.KubernetesNamespace, 56 *, 57 deployment_name: str, 58 image_name: str, 59 td_bootstrap_image: str, 60 network: str = 'default', 61 xds_server_uri: Optional[str] = None, 62 gcp_api_manager: gcp.api.GcpApiManager, 63 gcp_project: str, 64 gcp_service_account: str, 65 service_account_name: Optional[str] = None, 66 service_name: Optional[str] = None, 67 neg_name: Optional[str] = None, 68 deployment_template: str = 'server.deployment.yaml', 69 service_account_template: str = 'service-account.yaml', 70 service_template: str = 'server.service.yaml', 71 reuse_service: bool = False, 72 reuse_namespace: bool = False, 73 namespace_template: Optional[str] = None, 74 debug_use_port_forwarding: bool = False, 75 enable_workload_identity: bool = True): 76 super().__init__(k8s_namespace, 77 deployment_name=deployment_name, 78 image_name=image_name, 79 gcp_project=gcp_project, 80 gcp_service_account=gcp_service_account, 81 gcp_ui_url=gcp_api_manager.gcp_ui_url, 82 namespace_template=namespace_template, 83 reuse_namespace=reuse_namespace) 84 85 # Settings 86 self.deployment_template = deployment_template 87 self.service_name = service_name or deployment_name 88 self.service_template = service_template 89 self.reuse_service = reuse_service 90 self.enable_workload_identity = enable_workload_identity 91 self.debug_use_port_forwarding = debug_use_port_forwarding 92 # GCP Network Endpoint Group. 93 self.gcp_neg_name = neg_name or (f'{self.k8s_namespace.name}-' 94 f'{self.service_name}') 95 96 # Used by the TD bootstrap generator. 97 self.td_bootstrap_image = td_bootstrap_image 98 self.network = network 99 self.xds_server_uri = xds_server_uri 100 101 # Workload identity settings: 102 if self.enable_workload_identity: 103 # Kubernetes service account. 104 self.service_account_name = service_account_name or deployment_name 105 self.service_account_template = service_account_template 106 # GCP IAM API used to grant allow workload service accounts 107 # permission to use GCP service account identity. 108 self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project) 109 110 def run( # pylint: disable=arguments-differ,too-many-branches 111 self, 112 *, 113 test_port: int = DEFAULT_TEST_PORT, 114 maintenance_port: Optional[int] = None, 115 secure_mode: bool = False, 116 replica_count: int = 1, 117 log_to_stdout: bool = False) -> List[XdsTestServer]: 118 if not maintenance_port: 119 maintenance_port = self._get_default_maintenance_port(secure_mode) 120 121 # Implementation detail: in secure mode, maintenance ("backchannel") 122 # port must be different from the test port so communication with 123 # maintenance services can be reached independently of the security 124 # configuration under test. 125 if secure_mode and maintenance_port == test_port: 126 raise ValueError('port and maintenance_port must be different ' 127 'when running test server in secure mode') 128 # To avoid bugs with comparing wrong types. 129 if not (isinstance(test_port, int) and 130 isinstance(maintenance_port, int)): 131 raise TypeError('Port numbers must be integer') 132 133 if secure_mode and not self.enable_workload_identity: 134 raise ValueError('Secure mode requires Workload Identity enabled.') 135 136 logger.info( 137 'Deploying xDS test server "%s" to k8s namespace %s: test_port=%s ' 138 'maintenance_port=%s secure_mode=%s replica_count=%s', 139 self.deployment_name, self.k8s_namespace.name, test_port, 140 maintenance_port, secure_mode, replica_count) 141 super().run() 142 143 # Reuse existing if requested, create a new deployment when missing. 144 # Useful for debugging to avoid NEG loosing relation to deleted service. 145 if self.reuse_service: 146 self.service = self._reuse_service(self.service_name) 147 if not self.service: 148 self.service = self._create_service( 149 self.service_template, 150 service_name=self.service_name, 151 namespace_name=self.k8s_namespace.name, 152 deployment_name=self.deployment_name, 153 neg_name=self.gcp_neg_name, 154 test_port=test_port) 155 self._wait_service_neg(self.service_name, test_port) 156 157 if self.enable_workload_identity: 158 # Allow Kubernetes service account to use the GCP service account 159 # identity. 160 self._grant_workload_identity_user( 161 gcp_iam=self.gcp_iam, 162 gcp_service_account=self.gcp_service_account, 163 service_account_name=self.service_account_name) 164 165 # Create service account 166 self.service_account = self._create_service_account( 167 self.service_account_template, 168 service_account_name=self.service_account_name, 169 namespace_name=self.k8s_namespace.name, 170 gcp_service_account=self.gcp_service_account) 171 172 # Always create a new deployment 173 self.deployment = self._create_deployment( 174 self.deployment_template, 175 deployment_name=self.deployment_name, 176 image_name=self.image_name, 177 namespace_name=self.k8s_namespace.name, 178 service_account_name=self.service_account_name, 179 td_bootstrap_image=self.td_bootstrap_image, 180 xds_server_uri=self.xds_server_uri, 181 network=self.network, 182 replica_count=replica_count, 183 test_port=test_port, 184 maintenance_port=maintenance_port, 185 secure_mode=secure_mode) 186 187 pod_names = self._wait_deployment_pod_count(self.deployment, 188 replica_count) 189 pods = [] 190 for pod_name in pod_names: 191 pod = self._wait_pod_started(pod_name) 192 pods.append(pod) 193 if self.should_collect_logs: 194 self._start_logging_pod(pod, log_to_stdout=log_to_stdout) 195 196 # Verify the deployment reports all pods started as well. 197 self._wait_deployment_with_available_replicas(self.deployment_name, 198 replica_count) 199 self._start_completed() 200 201 servers: List[XdsTestServer] = [] 202 for pod in pods: 203 servers.append( 204 self._xds_test_server_for_pod(pod, 205 test_port=test_port, 206 maintenance_port=maintenance_port, 207 secure_mode=secure_mode)) 208 return servers 209 210 def _get_default_maintenance_port(self, secure_mode: bool) -> int: 211 if not secure_mode: 212 maintenance_port = self.DEFAULT_MAINTENANCE_PORT 213 else: 214 maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT 215 return maintenance_port 216 217 def _xds_test_server_for_pod(self, 218 pod: k8s.V1Pod, 219 *, 220 test_port: int = DEFAULT_TEST_PORT, 221 maintenance_port: Optional[int] = None, 222 secure_mode: bool = False) -> XdsTestServer: 223 if maintenance_port is None: 224 maintenance_port = self._get_default_maintenance_port(secure_mode) 225 226 if self.debug_use_port_forwarding: 227 pf = self._start_port_forwarding_pod(pod, maintenance_port) 228 rpc_port, rpc_host = pf.local_port, pf.local_address 229 else: 230 rpc_port, rpc_host = maintenance_port, None 231 232 return XdsTestServer(ip=pod.status.pod_ip, 233 rpc_port=test_port, 234 hostname=pod.metadata.name, 235 maintenance_port=rpc_port, 236 secure_mode=secure_mode, 237 rpc_host=rpc_host) 238 239 # pylint: disable=arguments-differ 240 def cleanup(self, *, force=False, force_namespace=False): 241 # TODO(sergiitk): rename to stop(). 242 try: 243 if self.deployment or force: 244 self._delete_deployment(self.deployment_name) 245 self.deployment = None 246 if (self.service and not self.reuse_service) or force: 247 self._delete_service(self.service_name) 248 self.service = None 249 if (self.enable_workload_identity and 250 (self.service_account or force)): 251 self._revoke_workload_identity_user( 252 gcp_iam=self.gcp_iam, 253 gcp_service_account=self.gcp_service_account, 254 service_account_name=self.service_account_name) 255 self._delete_service_account(self.service_account_name) 256 self.service_account = None 257 self._cleanup_namespace(force=(force_namespace and force)) 258 finally: 259 self._stop() 260 261 # pylint: enable=arguments-differ 262 263 @classmethod 264 def make_namespace_name(cls, 265 resource_prefix: str, 266 resource_suffix: str, 267 name: str = 'server') -> str: 268 """A helper to make consistent XdsTestServer kubernetes namespace name 269 for given resource prefix and suffix. 270 271 Note: the idea is to intentionally produce different namespace name for 272 the test server, and the test client, as that closely mimics real-world 273 deployments. 274 """ 275 return cls._make_namespace_name(resource_prefix, resource_suffix, name) 276