1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import logging
15
16from absl import flags
17from absl.testing import absltest
18
19from framework import xds_k8s_testcase
20from framework.helpers import rand
21from framework.helpers import skips
22
23logger = logging.getLogger(__name__)
24flags.adopt_module_key_flags(xds_k8s_testcase)
25
26# Type aliases
27_XdsTestServer = xds_k8s_testcase.XdsTestServer
28_XdsTestClient = xds_k8s_testcase.XdsTestClient
29_SecurityMode = xds_k8s_testcase.SecurityXdsKubernetesTestCase.SecurityMode
30_Lang = skips.Lang
31
32
33class SecurityTest(xds_k8s_testcase.SecurityXdsKubernetesTestCase):
34
35    @staticmethod
36    def is_supported(config: skips.TestConfig) -> bool:
37        if config.client_lang in (_Lang.CPP | _Lang.GO | _Lang.JAVA |
38                                  _Lang.PYTHON):
39            # Versions prior to v1.41.x don't support PSM Security.
40            # https://github.com/grpc/grpc/blob/master/doc/grpc_xds_features.md
41            return config.version_gte('v1.41.x')
42        elif config.client_lang == _Lang.NODE:
43            return False
44        return True
45
46    def test_mtls(self):
47        """mTLS test.
48
49        Both client and server configured to use TLS and mTLS.
50        """
51        self.setupTrafficDirectorGrpc()
52        self.setupSecurityPolicies(server_tls=True,
53                                   server_mtls=True,
54                                   client_tls=True,
55                                   client_mtls=True)
56
57        test_server: _XdsTestServer = self.startSecureTestServer()
58        self.setupServerBackends()
59        test_client: _XdsTestClient = self.startSecureTestClient(test_server)
60
61        self.assertTestAppSecurity(_SecurityMode.MTLS, test_client, test_server)
62        self.assertSuccessfulRpcs(test_client)
63        logger.info('[SUCCESS] mTLS security mode confirmed.')
64
65    def test_tls(self):
66        """TLS test.
67
68        Both client and server configured to use TLS and not use mTLS.
69        """
70        self.setupTrafficDirectorGrpc()
71        self.setupSecurityPolicies(server_tls=True,
72                                   server_mtls=False,
73                                   client_tls=True,
74                                   client_mtls=False)
75
76        test_server: _XdsTestServer = self.startSecureTestServer()
77        self.setupServerBackends()
78        test_client: _XdsTestClient = self.startSecureTestClient(test_server)
79
80        self.assertTestAppSecurity(_SecurityMode.TLS, test_client, test_server)
81        self.assertSuccessfulRpcs(test_client)
82        logger.info('[SUCCESS] TLS security mode confirmed.')
83
84    def test_plaintext_fallback(self):
85        """Plain-text fallback test.
86
87        Control plane provides no security config so both client and server
88        fallback to plaintext based on fallback-credentials.
89        """
90        self.setupTrafficDirectorGrpc()
91        self.setupSecurityPolicies(server_tls=False,
92                                   server_mtls=False,
93                                   client_tls=False,
94                                   client_mtls=False)
95
96        test_server: _XdsTestServer = self.startSecureTestServer()
97        self.setupServerBackends()
98        test_client: _XdsTestClient = self.startSecureTestClient(test_server)
99
100        self.assertTestAppSecurity(_SecurityMode.PLAINTEXT, test_client,
101                                   test_server)
102        self.assertSuccessfulRpcs(test_client)
103        logger.info('[SUCCESS] Plaintext security mode confirmed.')
104
105    def test_mtls_error(self):
106        """Negative test: mTLS Error.
107
108        Server expects client mTLS cert, but client configured only for TLS.
109
110        Note: because this is a negative test we need to make sure the mTLS
111        failure happens after receiving the correct configuration at the
112        client. To ensure that we will perform the following steps in that
113        sequence:
114
115        - Creation of a backendService, and attaching the backend (NEG)
116        - Creation of the Server mTLS Policy, and attaching to the ECS
117        - Creation of the Client TLS Policy, and attaching to the backendService
118        - Creation of the urlMap, targetProxy, and forwardingRule
119
120        With this sequence we are sure that when the client receives the
121        endpoints of the backendService the security-config would also have
122        been received as confirmed by the TD team.
123        """
124        # Create backend service
125        self.td.setup_backend_for_grpc(
126            health_check_port=self.server_maintenance_port)
127
128        # Start server and attach its NEGs to the backend service, but
129        # until they become healthy.
130        test_server: _XdsTestServer = self.startSecureTestServer()
131        self.setupServerBackends(wait_for_healthy_status=False)
132
133        # Setup policies and attach them.
134        self.setupSecurityPolicies(server_tls=True,
135                                   server_mtls=True,
136                                   client_tls=True,
137                                   client_mtls=False)
138
139        # Create the routing rule map.
140        self.td.setup_routing_rule_map_for_grpc(self.server_xds_host,
141                                                self.server_xds_port)
142        # Now that TD setup is complete, Backend Service can be populated
143        # with healthy backends (NEGs).
144        self.td.wait_for_backends_healthy_status()
145
146        # Start the client, but don't wait for it to report a healthy channel.
147        test_client: _XdsTestClient = self.startSecureTestClient(
148            test_server, wait_for_active_server_channel=False)
149
150        self.assertClientCannotReachServerRepeatedly(test_client)
151        logger.info(
152            "[SUCCESS] Client's connectivity state is consistent with a mTLS "
153            "error caused by not presenting mTLS certificate to the server.")
154
155    def test_server_authz_error(self):
156        """Negative test: AuthZ error.
157
158        Client does not authorize server because of mismatched SAN name.
159        The order of operations is the same as in `test_mtls_error`.
160        """
161        # Create backend service
162        self.td.setup_backend_for_grpc(
163            health_check_port=self.server_maintenance_port)
164
165        # Start server and attach its NEGs to the backend service, but
166        # until they become healthy.
167        test_server: _XdsTestServer = self.startSecureTestServer()
168        self.setupServerBackends(wait_for_healthy_status=False)
169
170        # Regular TLS setup, but with client policy configured using
171        # intentionality incorrect server_namespace.
172        self.td.setup_server_security(server_namespace=self.server_namespace,
173                                      server_name=self.server_name,
174                                      server_port=self.server_port,
175                                      tls=True,
176                                      mtls=False)
177        incorrect_namespace = f'incorrect-namespace-{rand.rand_string()}'
178        self.td.setup_client_security(server_namespace=incorrect_namespace,
179                                      server_name=self.server_name,
180                                      tls=True,
181                                      mtls=False)
182
183        # Create the routing rule map.
184        self.td.setup_routing_rule_map_for_grpc(self.server_xds_host,
185                                                self.server_xds_port)
186        # Now that TD setup is complete, Backend Service can be populated
187        # with healthy backends (NEGs).
188        self.td.wait_for_backends_healthy_status()
189
190        # Start the client, but don't wait for it to report a healthy channel.
191        test_client: _XdsTestClient = self.startSecureTestClient(
192            test_server, wait_for_active_server_channel=False)
193
194        self.assertClientCannotReachServerRepeatedly(test_client)
195        logger.info("[SUCCESS] Client's connectivity state is consistent with "
196                    "AuthZ error caused by server presenting incorrect SAN.")
197
198
199if __name__ == '__main__':
200    absltest.main()
201