1# Copyright 2022 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import datetime
15import logging
16
17from absl import flags
18from absl.testing import absltest
19import grpc
20
21from framework import xds_k8s_flags
22from framework import xds_k8s_testcase
23from framework.helpers import skips
24
25logger = logging.getLogger(__name__)
26flags.adopt_module_key_flags(xds_k8s_testcase)
27
28# Type aliases
29_XdsTestServer = xds_k8s_testcase.XdsTestServer
30_XdsTestClient = xds_k8s_testcase.XdsTestClient
31_Lang = skips.Lang
32
33_EXPECTED_STATUS = grpc.StatusCode.DATA_LOSS
34
35
36class CustomLbTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
37
38    @classmethod
39    def setUpClass(cls):
40        """Force the java test server for languages not yet supporting
41        the `rpc-behavior` feature.
42        https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server
43        """
44        super().setUpClass()
45        # gRPC Java implemented server "error-code-" rpc-behavior in v1.47.x.
46        # gRPC CPP implemented rpc-behavior in the same version, as custom_lb.
47        if cls.lang_spec.client_lang in _Lang.JAVA | _Lang.CPP:
48            return
49
50        # gRPC go, python and node fallback to the gRPC Java.
51        # TODO(https://github.com/grpc/grpc-go/issues/6288): use go server.
52        # TODO(https://github.com/grpc/grpc/issues/33134): use python server.
53        cls.server_image = xds_k8s_flags.SERVER_IMAGE_CANONICAL.value
54
55    @staticmethod
56    def is_supported(config: skips.TestConfig) -> bool:
57        if config.client_lang == _Lang.JAVA:
58            return config.version_gte('v1.47.x')
59        if config.client_lang == _Lang.CPP:
60            return config.version_gte('v1.55.x')
61        if config.client_lang == _Lang.GO:
62            return config.version_gte('v1.56.x')
63        return False
64
65    def test_custom_lb_config(self):
66        with self.subTest('0_create_health_check'):
67            self.td.create_health_check()
68
69        # Configures a custom, test LB on the client to instruct the servers
70        # to always respond with a specific error code.
71        #
72        # The first policy in the list is a non-existent one to verify that
73        # the gRPC client can gracefully move down the list to the valid one
74        # once it determines the first one is not available.
75        with self.subTest('1_create_backend_service'):
76            self.td.create_backend_service(locality_lb_policies=[{
77                'customPolicy': {
78                    'name': 'test.ThisLoadBalancerDoesNotExist',
79                    'data': '{ "foo": "bar" }'
80                },
81            }, {
82                'customPolicy': {
83                    'name':
84                        'test.RpcBehaviorLoadBalancer',
85                    'data':
86                        f'{{ "rpcBehavior": "error-code-{_EXPECTED_STATUS.value[0]}" }}'
87                }
88            }])
89
90        with self.subTest('2_create_url_map'):
91            self.td.create_url_map(self.server_xds_host, self.server_xds_port)
92
93        with self.subTest('3_create_target_proxy'):
94            self.td.create_target_proxy()
95
96        with self.subTest('4_create_forwarding_rule'):
97            self.td.create_forwarding_rule(self.server_xds_port)
98
99        with self.subTest('5_start_test_server'):
100            test_server: _XdsTestServer = self.startTestServers()[0]
101
102        with self.subTest('6_add_server_backends_to_backend_service'):
103            self.setupServerBackends()
104
105        with self.subTest('7_start_test_client'):
106            test_client: _XdsTestClient = self.startTestClient(test_server)
107
108        with self.subTest('8_test_client_xds_config_exists'):
109            self.assertXdsConfigExists(test_client)
110
111        # Verify status codes from the servers have the configured one.
112        with self.subTest('9_test_server_returned_configured_status_code'):
113            self.assertRpcStatusCodes(test_client,
114                                      expected_status=_EXPECTED_STATUS,
115                                      duration=datetime.timedelta(seconds=10),
116                                      method='UNARY_CALL')
117
118
119if __name__ == '__main__':
120    absltest.main(failfast=True)
121