1# Copyright 2023 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Common functionality for bin/ python helpers."""
15import atexit
16import signal
17import sys
18
19from absl import logging
20
21from framework import xds_flags
22from framework import xds_k8s_flags
23from framework.infrastructure import gcp
24from framework.infrastructure import k8s
25from framework.test_app import client_app
26from framework.test_app import server_app
27from framework.test_app.runners.k8s import k8s_xds_client_runner
28from framework.test_app.runners.k8s import k8s_xds_server_runner
29
30logger = logging.get_absl_logger()
31
32# Type aliases
33KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
34KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
35_XdsTestServer = server_app.XdsTestServer
36_XdsTestClient = client_app.XdsTestClient
37
38
39def make_client_namespace(
40        k8s_api_manager: k8s.KubernetesApiManager) -> k8s.KubernetesNamespace:
41    namespace_name: str = KubernetesClientRunner.make_namespace_name(
42        xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value)
43    return k8s.KubernetesNamespace(k8s_api_manager, namespace_name)
44
45
46def make_client_runner(namespace: k8s.KubernetesNamespace,
47                       gcp_api_manager: gcp.api.GcpApiManager,
48                       port_forwarding: bool = False,
49                       reuse_namespace: bool = True,
50                       secure: bool = False) -> KubernetesClientRunner:
51    # KubernetesClientRunner arguments.
52    runner_kwargs = dict(
53        deployment_name=xds_flags.CLIENT_NAME.value,
54        image_name=xds_k8s_flags.CLIENT_IMAGE.value,
55        td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value,
56        gcp_project=xds_flags.PROJECT.value,
57        gcp_api_manager=gcp_api_manager,
58        gcp_service_account=xds_k8s_flags.GCP_SERVICE_ACCOUNT.value,
59        xds_server_uri=xds_flags.XDS_SERVER_URI.value,
60        network=xds_flags.NETWORK.value,
61        stats_port=xds_flags.CLIENT_PORT.value,
62        reuse_namespace=reuse_namespace,
63        debug_use_port_forwarding=port_forwarding)
64
65    if secure:
66        runner_kwargs.update(
67            deployment_template='client-secure.deployment.yaml')
68    return KubernetesClientRunner(namespace, **runner_kwargs)
69
70
71def make_server_namespace(
72        k8s_api_manager: k8s.KubernetesApiManager) -> k8s.KubernetesNamespace:
73    namespace_name: str = KubernetesServerRunner.make_namespace_name(
74        xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value)
75    return k8s.KubernetesNamespace(k8s_api_manager, namespace_name)
76
77
78def make_server_runner(namespace: k8s.KubernetesNamespace,
79                       gcp_api_manager: gcp.api.GcpApiManager,
80                       port_forwarding: bool = False,
81                       reuse_namespace: bool = True,
82                       reuse_service: bool = False,
83                       secure: bool = False) -> KubernetesServerRunner:
84    # KubernetesServerRunner arguments.
85    runner_kwargs = dict(
86        deployment_name=xds_flags.SERVER_NAME.value,
87        image_name=xds_k8s_flags.SERVER_IMAGE.value,
88        td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value,
89        xds_server_uri=xds_flags.XDS_SERVER_URI.value,
90        gcp_project=xds_flags.PROJECT.value,
91        gcp_api_manager=gcp_api_manager,
92        gcp_service_account=xds_k8s_flags.GCP_SERVICE_ACCOUNT.value,
93        network=xds_flags.NETWORK.value,
94        reuse_namespace=reuse_namespace,
95        reuse_service=reuse_service,
96        debug_use_port_forwarding=port_forwarding)
97
98    if secure:
99        runner_kwargs['deployment_template'] = 'server-secure.deployment.yaml'
100
101    return KubernetesServerRunner(namespace, **runner_kwargs)
102
103
104def _ensure_atexit(signum, frame):
105    """Needed to handle signals or atexit handler won't be called."""
106    del frame
107
108    # Pylint is wrong about "Module 'signal' has no 'Signals' member":
109    # https://docs.python.org/3/library/signal.html#signal.Signals
110    sig = signal.Signals(signum)  # pylint: disable=no-member
111    logger.warning('Caught %r, initiating graceful shutdown...\n', sig)
112    sys.exit(1)
113
114
115def _graceful_exit(server_runner: KubernetesServerRunner,
116                   client_runner: KubernetesClientRunner):
117    """Stop port forwarding processes."""
118    client_runner.stop_pod_dependencies()
119    server_runner.stop_pod_dependencies()
120
121
122def register_graceful_exit(server_runner: KubernetesServerRunner,
123                           client_runner: KubernetesClientRunner):
124    atexit.register(_graceful_exit, server_runner, client_runner)
125    for signum in (signal.SIGTERM, signal.SIGHUP, signal.SIGINT):
126        signal.signal(signum, _ensure_atexit)
127
128
129def get_client_pod(client_runner: KubernetesClientRunner,
130                   deployment_name: str) -> k8s.V1Pod:
131    client_deployment: k8s.V1Deployment
132    client_deployment = client_runner.k8s_namespace.get_deployment(
133        deployment_name)
134    client_pod_name: str = client_runner._wait_deployment_pod_count(
135        client_deployment)[0]
136    return client_runner._wait_pod_started(client_pod_name)
137
138
139def get_server_pod(server_runner: KubernetesServerRunner,
140                   deployment_name: str) -> k8s.V1Pod:
141    server_deployment: k8s.V1Deployment
142    server_deployment = server_runner.k8s_namespace.get_deployment(
143        deployment_name)
144    server_pod_name: str = server_runner._wait_deployment_pod_count(
145        server_deployment)[0]
146    return server_runner._wait_pod_started(server_pod_name)
147
148
149def get_test_server_for_pod(server_runner: KubernetesServerRunner,
150                            server_pod: k8s.V1Pod, **kwargs) -> _XdsTestServer:
151    return server_runner._xds_test_server_for_pod(server_pod, **kwargs)
152
153
154def get_test_client_for_pod(client_runner: KubernetesClientRunner,
155                            client_pod: k8s.V1Pod, **kwargs) -> _XdsTestClient:
156    return client_runner._xds_test_client_for_pod(client_pod, **kwargs)
157