1# Copyright 2021 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14""" 15This contains helpers for gRPC services defined in 16https://github.com/envoyproxy/envoy/blob/main/api/envoy/service/status/v3/csds.proto 17""" 18 19import logging 20from typing import Optional 21 22# Envoy protos provided by PyPI package xds-protos 23# Needs to import the generated Python file to load descriptors 24# pylint: disable=unused-import 25from envoy.extensions.filters.common.fault.v3 import fault_pb2 as _ 26from envoy.extensions.filters.http.fault.v3 import fault_pb2 as _ 27from envoy.extensions.filters.http.router.v3 import router_pb2 as _ 28from envoy.extensions.filters.network.http_connection_manager.v3 import \ 29 http_connection_manager_pb2 as _ 30# pylint: enable=unused-import 31from envoy.service.status.v3 import csds_pb2 32from envoy.service.status.v3 import csds_pb2_grpc 33import grpc 34 35import framework.rpc 36 37logger = logging.getLogger(__name__) 38 39# Type aliases 40ClientConfig = csds_pb2.ClientConfig 41_ClientStatusRequest = csds_pb2.ClientStatusRequest 42 43 44class CsdsClient(framework.rpc.grpc.GrpcClientHelper): 45 stub: csds_pb2_grpc.ClientStatusDiscoveryServiceStub 46 47 def __init__(self, 48 channel: grpc.Channel, 49 *, 50 log_target: Optional[str] = ''): 51 super().__init__(channel, 52 csds_pb2_grpc.ClientStatusDiscoveryServiceStub, 53 log_target=log_target) 54 55 def fetch_client_status(self, **kwargs) -> Optional[ClientConfig]: 56 """Fetches the active xDS configurations.""" 57 response = self.call_unary_with_deadline(rpc='FetchClientStatus', 58 req=_ClientStatusRequest(), 59 **kwargs) 60 if len(response.config) != 1: 61 logger.debug('Unexpected number of client configs: %s', 62 len(response.config)) 63 return None 64 return response.config[0] 65