1# Copyright 2022 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import logging
15from typing import Optional
16
17from framework import xds_k8s_testcase
18from framework.helpers import rand as helpers_rand
19from framework.infrastructure import k8s
20from framework.infrastructure import traffic_director
21from framework.test_app.runners.k8s import k8s_xds_client_runner
22from framework.test_app.runners.k8s import k8s_xds_server_runner
23
24logger = logging.getLogger(__name__)
25
26# Type aliases
27TrafficDirectorManager = traffic_director.TrafficDirectorManager
28XdsTestServer = xds_k8s_testcase.XdsTestServer
29XdsTestClient = xds_k8s_testcase.XdsTestClient
30KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
31KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
32
33
34class BootstrapGeneratorBaseTest(xds_k8s_testcase.XdsKubernetesBaseTestCase):
35    """Common functionality to support testing of bootstrap generator versions
36    across gRPC clients and servers."""
37
38    @classmethod
39    def setUpClass(cls):
40        """Hook method for setting up class fixture before running tests in
41        the class.
42        """
43        super().setUpClass()
44        if cls.server_maintenance_port is None:
45            cls.server_maintenance_port = \
46                KubernetesServerRunner.DEFAULT_MAINTENANCE_PORT
47
48        # Bootstrap generator tests are run as parameterized tests which only
49        # perform steps specific to the parameterized version of the bootstrap
50        # generator under test.
51        #
52        # Here, we perform setup steps which are common across client and server
53        # side variants of the bootstrap generator test.
54        if cls.resource_suffix_randomize:
55            cls.resource_suffix = helpers_rand.random_resource_suffix()
56        logger.info('Test run resource prefix: %s, suffix: %s',
57                    cls.resource_prefix, cls.resource_suffix)
58
59        # TD Manager
60        cls.td = cls.initTrafficDirectorManager()
61
62        # Test namespaces for client and server.
63        cls.server_namespace = KubernetesServerRunner.make_namespace_name(
64            cls.resource_prefix, cls.resource_suffix)
65        cls.client_namespace = KubernetesClientRunner.make_namespace_name(
66            cls.resource_prefix, cls.resource_suffix)
67
68        # Ensures the firewall exist
69        if cls.ensure_firewall:
70            cls.td.create_firewall_rule(
71                allowed_ports=cls.firewall_allowed_ports)
72
73        # Randomize xds port, when it's set to 0
74        if cls.server_xds_port == 0:
75            # TODO(sergiitk): this is prone to race conditions:
76            #  The port might not me taken now, but there's not guarantee
77            #  it won't be taken until the tests get to creating
78            #  forwarding rule. This check is better than nothing,
79            #  but we should find a better approach.
80            cls.server_xds_port = cls.td.find_unused_forwarding_rule_port()
81            logger.info('Found unused xds port: %s', cls.server_xds_port)
82
83        # Common TD resources across client and server tests.
84        cls.td.setup_for_grpc(cls.server_xds_host,
85                              cls.server_xds_port,
86                              health_check_port=cls.server_maintenance_port)
87
88    @classmethod
89    def tearDownClass(cls):
90        cls.td.cleanup(force=cls.force_cleanup)
91        super().tearDownClass()
92
93    @classmethod
94    def initTrafficDirectorManager(cls) -> TrafficDirectorManager:
95        return TrafficDirectorManager(
96            cls.gcp_api_manager,
97            project=cls.project,
98            resource_prefix=cls.resource_prefix,
99            resource_suffix=cls.resource_suffix,
100            network=cls.network,
101            compute_api_version=cls.compute_api_version)
102
103    @classmethod
104    def initKubernetesServerRunner(
105            cls,
106            *,
107            td_bootstrap_image: Optional[str] = None) -> KubernetesServerRunner:
108        if not td_bootstrap_image:
109            td_bootstrap_image = cls.td_bootstrap_image
110        return KubernetesServerRunner(
111            k8s.KubernetesNamespace(cls.k8s_api_manager, cls.server_namespace),
112            deployment_name=cls.server_name,
113            image_name=cls.server_image,
114            td_bootstrap_image=td_bootstrap_image,
115            gcp_project=cls.project,
116            gcp_api_manager=cls.gcp_api_manager,
117            gcp_service_account=cls.gcp_service_account,
118            xds_server_uri=cls.xds_server_uri,
119            network=cls.network,
120            debug_use_port_forwarding=cls.debug_use_port_forwarding,
121            enable_workload_identity=cls.enable_workload_identity)
122
123    @staticmethod
124    def startTestServer(server_runner,
125                        port,
126                        maintenance_port,
127                        xds_host,
128                        xds_port,
129                        replica_count=1,
130                        **kwargs) -> XdsTestServer:
131        test_server = server_runner.run(replica_count=replica_count,
132                                        test_port=port,
133                                        maintenance_port=maintenance_port,
134                                        **kwargs)[0]
135        test_server.set_xds_address(xds_host, xds_port)
136        return test_server
137
138    def initKubernetesClientRunner(
139            self,
140            td_bootstrap_image: Optional[str] = None) -> KubernetesClientRunner:
141        if not td_bootstrap_image:
142            td_bootstrap_image = self.td_bootstrap_image
143        return KubernetesClientRunner(
144            k8s.KubernetesNamespace(self.k8s_api_manager,
145                                    self.client_namespace),
146            deployment_name=self.client_name,
147            image_name=self.client_image,
148            td_bootstrap_image=td_bootstrap_image,
149            gcp_project=self.project,
150            gcp_api_manager=self.gcp_api_manager,
151            gcp_service_account=self.gcp_service_account,
152            xds_server_uri=self.xds_server_uri,
153            network=self.network,
154            debug_use_port_forwarding=self.debug_use_port_forwarding,
155            enable_workload_identity=self.enable_workload_identity,
156            stats_port=self.client_port,
157            reuse_namespace=self.server_namespace == self.client_namespace)
158
159    def startTestClient(self, test_server: XdsTestServer,
160                        **kwargs) -> XdsTestClient:
161        test_client = self.client_runner.run(server_target=test_server.xds_uri,
162                                             **kwargs)
163        test_client.wait_for_active_server_channel()
164        return test_client
165