1# Copyright 2021 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import logging
15from typing import List
16
17from absl import flags
18from absl.testing import absltest
19
20from framework import xds_k8s_testcase
21from framework.infrastructure import k8s
22from framework.test_app.runners.k8s import k8s_xds_server_runner
23
24logger = logging.getLogger(__name__)
25flags.adopt_module_key_flags(xds_k8s_testcase)
26
27# Type aliases
28_XdsTestServer = xds_k8s_testcase.XdsTestServer
29_XdsTestClient = xds_k8s_testcase.XdsTestClient
30_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
31
32
33class RemoveNegTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
34
35    def setUp(self):
36        super().setUp()
37        self.alternate_server_runner = _KubernetesServerRunner(
38            k8s.KubernetesNamespace(self.k8s_api_manager,
39                                    self.server_namespace),
40            deployment_name=self.server_name + '-alt',
41            image_name=self.server_image,
42            gcp_service_account=self.gcp_service_account,
43            td_bootstrap_image=self.td_bootstrap_image,
44            gcp_project=self.project,
45            gcp_api_manager=self.gcp_api_manager,
46            xds_server_uri=self.xds_server_uri,
47            network=self.network,
48            debug_use_port_forwarding=self.debug_use_port_forwarding,
49            reuse_namespace=True)
50
51    def cleanup(self):
52        super().cleanup()
53        if hasattr(self, 'alternate_server_runner'):
54            self.alternate_server_runner.cleanup(
55                force=self.force_cleanup, force_namespace=self.force_cleanup)
56
57    def test_remove_neg(self) -> None:
58        with self.subTest('00_create_health_check'):
59            self.td.create_health_check()
60
61        with self.subTest('01_create_backend_services'):
62            self.td.create_backend_service()
63
64        with self.subTest('02_create_url_map'):
65            self.td.create_url_map(self.server_xds_host, self.server_xds_port)
66
67        with self.subTest('03_create_target_proxy'):
68            self.td.create_target_proxy()
69
70        with self.subTest('04_create_forwarding_rule'):
71            self.td.create_forwarding_rule(self.server_xds_port)
72
73        default_test_servers: List[_XdsTestServer]
74        same_zone_test_servers: List[_XdsTestServer]
75        with self.subTest('05_start_test_servers'):
76            default_test_servers = self.startTestServers()
77            same_zone_test_servers = self.startTestServers(
78                server_runner=self.alternate_server_runner)
79
80        with self.subTest('06_add_server_backends_to_backend_services'):
81            self.setupServerBackends()
82            self.setupServerBackends(server_runner=self.alternate_server_runner)
83
84        test_client: _XdsTestClient
85        with self.subTest('07_start_test_client'):
86            test_client = self.startTestClient(default_test_servers[0])
87
88        with self.subTest('08_test_client_xds_config_exists'):
89            self.assertXdsConfigExists(test_client)
90
91        with self.subTest('09_test_server_received_rpcs_from_test_client'):
92            self.assertSuccessfulRpcs(test_client)
93
94        with self.subTest('10_remove_neg'):
95            self.assertRpcsEventuallyGoToGivenServers(
96                test_client, default_test_servers + same_zone_test_servers)
97            self.removeServerBackends(
98                server_runner=self.alternate_server_runner)
99            self.assertRpcsEventuallyGoToGivenServers(test_client,
100                                                      default_test_servers)
101
102
103if __name__ == '__main__':
104    absltest.main(failfast=True)
105