1# Copyright 2021 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import logging 15 16from absl import flags 17from absl.testing import absltest 18 19from framework import xds_k8s_testcase 20 21logger = logging.getLogger(__name__) 22flags.adopt_module_key_flags(xds_k8s_testcase) 23 24_XdsTestServer = xds_k8s_testcase.XdsTestServer 25_XdsTestClient = xds_k8s_testcase.XdsTestClient 26 27 28class AppNetTest(xds_k8s_testcase.AppNetXdsKubernetesTestCase): 29 30 def test_ping_pong(self): 31 with self.subTest('0_create_health_check'): 32 self.td.create_health_check() 33 34 with self.subTest('1_create_backend_service'): 35 self.td.create_backend_service() 36 37 with self.subTest('2_create_mesh'): 38 self.td.create_mesh() 39 40 with self.subTest('3_create_grpc_route'): 41 self.td.create_grpc_route(self.server_xds_host, 42 self.server_xds_port) 43 44 test_server: _XdsTestServer 45 with self.subTest('4_start_test_server'): 46 test_server = self.startTestServers(replica_count=1)[0] 47 48 with self.subTest('5_setup_server_backends'): 49 self.setupServerBackends() 50 51 test_client: _XdsTestClient 52 with self.subTest('6_start_test_client'): 53 test_client = self.startTestClient(test_server, 54 config_mesh=self.td.mesh.name) 55 56 with self.subTest('7_assert_xds_config_exists'): 57 self.assertXdsConfigExists(test_client) 58 59 with self.subTest('8_assert_successful_rpcs'): 60 self.assertSuccessfulRpcs(test_client) 61 62 63if __name__ == '__main__': 64 absltest.main(failfast=True) 65