1# Copyright 2021 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15This contains helpers for gRPC services defined in
16https://github.com/envoyproxy/envoy/blob/main/api/envoy/service/status/v3/csds.proto
17"""
18
19import logging
20from typing import Optional
21
22# Envoy protos provided by PyPI package xds-protos
23# Needs to import the generated Python file to load descriptors
24# pylint: disable=unused-import
25from envoy.extensions.filters.common.fault.v3 import fault_pb2 as _
26from envoy.extensions.filters.http.fault.v3 import fault_pb2 as _
27from envoy.extensions.filters.http.router.v3 import router_pb2 as _
28from envoy.extensions.filters.network.http_connection_manager.v3 import \
29    http_connection_manager_pb2 as _
30# pylint: enable=unused-import
31from envoy.service.status.v3 import csds_pb2
32from envoy.service.status.v3 import csds_pb2_grpc
33import grpc
34
35import framework.rpc
36
37logger = logging.getLogger(__name__)
38
39# Type aliases
40ClientConfig = csds_pb2.ClientConfig
41_ClientStatusRequest = csds_pb2.ClientStatusRequest
42
43
44class CsdsClient(framework.rpc.grpc.GrpcClientHelper):
45    stub: csds_pb2_grpc.ClientStatusDiscoveryServiceStub
46
47    def __init__(self,
48                 channel: grpc.Channel,
49                 *,
50                 log_target: Optional[str] = ''):
51        super().__init__(channel,
52                         csds_pb2_grpc.ClientStatusDiscoveryServiceStub,
53                         log_target=log_target)
54
55    def fetch_client_status(self, **kwargs) -> Optional[ClientConfig]:
56        """Fetches the active xDS configurations."""
57        response = self.call_unary_with_deadline(rpc='FetchClientStatus',
58                                                 req=_ClientStatusRequest(),
59                                                 **kwargs)
60        if len(response.config) != 1:
61            logger.debug('Unexpected number of client configs: %s',
62                         len(response.config))
63            return None
64        return response.config[0]
65