1 /* 2 * Copyright 2022 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import com.google.common.annotations.VisibleForTesting; 22 import com.google.common.base.MoreObjects; 23 import com.google.common.collect.ImmutableList; 24 import com.google.protobuf.Message; 25 import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; 26 import io.envoyproxy.envoy.type.v3.FractionalPercent; 27 import io.grpc.EquivalentAddressGroup; 28 import io.grpc.xds.Endpoints.DropOverload; 29 import io.grpc.xds.Endpoints.LocalityLbEndpoints; 30 import io.grpc.xds.XdsClient.ResourceUpdate; 31 import io.grpc.xds.XdsClientImpl.ResourceInvalidException; 32 import io.grpc.xds.XdsEndpointResource.EdsUpdate; 33 import java.net.InetSocketAddress; 34 import java.util.ArrayList; 35 import java.util.Collections; 36 import java.util.HashMap; 37 import java.util.HashSet; 38 import java.util.LinkedHashMap; 39 import java.util.List; 40 import java.util.Map; 41 import java.util.Objects; 42 import java.util.Set; 43 import javax.annotation.Nullable; 44 45 class XdsEndpointResource extends XdsResourceType<EdsUpdate> { 46 static final String ADS_TYPE_URL_EDS = 47 "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; 48 49 private static final XdsEndpointResource instance = new XdsEndpointResource(); 50 getInstance()51 public static XdsEndpointResource getInstance() { 52 return instance; 53 } 54 55 @Override 56 @Nullable extractResourceName(Message unpackedResource)57 String extractResourceName(Message unpackedResource) { 58 if (!(unpackedResource instanceof ClusterLoadAssignment)) { 59 return null; 60 } 61 return ((ClusterLoadAssignment) unpackedResource).getClusterName(); 62 } 63 64 @Override typeName()65 String typeName() { 66 return "EDS"; 67 } 68 69 @Override typeUrl()70 String typeUrl() { 71 return ADS_TYPE_URL_EDS; 72 } 73 74 @Override isFullStateOfTheWorld()75 boolean isFullStateOfTheWorld() { 76 return false; 77 } 78 79 @Override unpackedClassName()80 Class<ClusterLoadAssignment> unpackedClassName() { 81 return ClusterLoadAssignment.class; 82 } 83 84 @Override doParse(Args args, Message unpackedMessage)85 EdsUpdate doParse(Args args, Message unpackedMessage) 86 throws ResourceInvalidException { 87 if (!(unpackedMessage instanceof ClusterLoadAssignment)) { 88 throw new ResourceInvalidException("Invalid message type: " + unpackedMessage.getClass()); 89 } 90 return processClusterLoadAssignment((ClusterLoadAssignment) unpackedMessage); 91 } 92 processClusterLoadAssignment(ClusterLoadAssignment assignment)93 private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment) 94 throws ResourceInvalidException { 95 Map<Integer, Set<Locality>> priorities = new HashMap<>(); 96 Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap = new LinkedHashMap<>(); 97 List<Endpoints.DropOverload> dropOverloads = new ArrayList<>(); 98 int maxPriority = -1; 99 for (io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints localityLbEndpointsProto 100 : assignment.getEndpointsList()) { 101 StructOrError<LocalityLbEndpoints> structOrError = 102 parseLocalityLbEndpoints(localityLbEndpointsProto); 103 if (structOrError == null) { 104 continue; 105 } 106 if (structOrError.getErrorDetail() != null) { 107 throw new ResourceInvalidException(structOrError.getErrorDetail()); 108 } 109 110 LocalityLbEndpoints localityLbEndpoints = structOrError.getStruct(); 111 int priority = localityLbEndpoints.priority(); 112 maxPriority = Math.max(maxPriority, priority); 113 // Note endpoints with health status other than HEALTHY and UNKNOWN are still 114 // handed over to watching parties. It is watching parties' responsibility to 115 // filter out unhealthy endpoints. See EnvoyProtoData.LbEndpoint#isHealthy(). 116 Locality locality = parseLocality(localityLbEndpointsProto.getLocality()); 117 localityLbEndpointsMap.put(locality, localityLbEndpoints); 118 if (!priorities.containsKey(priority)) { 119 priorities.put(priority, new HashSet<>()); 120 } 121 if (!priorities.get(priority).add(locality)) { 122 throw new ResourceInvalidException("ClusterLoadAssignment has duplicate locality:" 123 + locality + " for priority:" + priority); 124 } 125 } 126 if (priorities.size() != maxPriority + 1) { 127 throw new ResourceInvalidException("ClusterLoadAssignment has sparse priorities"); 128 } 129 130 for (ClusterLoadAssignment.Policy.DropOverload dropOverloadProto 131 : assignment.getPolicy().getDropOverloadsList()) { 132 dropOverloads.add(parseDropOverload(dropOverloadProto)); 133 } 134 return new EdsUpdate(assignment.getClusterName(), localityLbEndpointsMap, dropOverloads); 135 } 136 parseLocality(io.envoyproxy.envoy.config.core.v3.Locality proto)137 private static Locality parseLocality(io.envoyproxy.envoy.config.core.v3.Locality proto) { 138 return Locality.create(proto.getRegion(), proto.getZone(), proto.getSubZone()); 139 } 140 parseDropOverload( io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy.DropOverload proto)141 private static DropOverload parseDropOverload( 142 io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy.DropOverload proto) { 143 return DropOverload.create(proto.getCategory(), getRatePerMillion(proto.getDropPercentage())); 144 } 145 getRatePerMillion(FractionalPercent percent)146 private static int getRatePerMillion(FractionalPercent percent) { 147 int numerator = percent.getNumerator(); 148 FractionalPercent.DenominatorType type = percent.getDenominator(); 149 switch (type) { 150 case TEN_THOUSAND: 151 numerator *= 100; 152 break; 153 case HUNDRED: 154 numerator *= 10_000; 155 break; 156 case MILLION: 157 break; 158 case UNRECOGNIZED: 159 default: 160 throw new IllegalArgumentException("Unknown denominator type of " + percent); 161 } 162 163 if (numerator > 1_000_000 || numerator < 0) { 164 numerator = 1_000_000; 165 } 166 return numerator; 167 } 168 169 170 @VisibleForTesting 171 @Nullable parseLocalityLbEndpoints( io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto)172 static StructOrError<LocalityLbEndpoints> parseLocalityLbEndpoints( 173 io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto) { 174 // Filter out localities without or with 0 weight. 175 if (!proto.hasLoadBalancingWeight() || proto.getLoadBalancingWeight().getValue() < 1) { 176 return null; 177 } 178 if (proto.getPriority() < 0) { 179 return StructOrError.fromError("negative priority"); 180 } 181 List<Endpoints.LbEndpoint> endpoints = new ArrayList<>(proto.getLbEndpointsCount()); 182 for (io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint endpoint : proto.getLbEndpointsList()) { 183 // The endpoint field of each lb_endpoints must be set. 184 // Inside of it: the address field must be set. 185 if (!endpoint.hasEndpoint() || !endpoint.getEndpoint().hasAddress()) { 186 return StructOrError.fromError("LbEndpoint with no endpoint/address"); 187 } 188 io.envoyproxy.envoy.config.core.v3.SocketAddress socketAddress = 189 endpoint.getEndpoint().getAddress().getSocketAddress(); 190 InetSocketAddress addr = 191 new InetSocketAddress(socketAddress.getAddress(), socketAddress.getPortValue()); 192 boolean isHealthy = 193 endpoint.getHealthStatus() == io.envoyproxy.envoy.config.core.v3.HealthStatus.HEALTHY 194 || endpoint.getHealthStatus() 195 == io.envoyproxy.envoy.config.core.v3.HealthStatus.UNKNOWN; 196 endpoints.add(Endpoints.LbEndpoint.create( 197 new EquivalentAddressGroup(ImmutableList.<java.net.SocketAddress>of(addr)), 198 endpoint.getLoadBalancingWeight().getValue(), isHealthy)); 199 } 200 return StructOrError.fromStruct(Endpoints.LocalityLbEndpoints.create( 201 endpoints, proto.getLoadBalancingWeight().getValue(), proto.getPriority())); 202 } 203 204 static final class EdsUpdate implements ResourceUpdate { 205 final String clusterName; 206 final Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap; 207 final List<DropOverload> dropPolicies; 208 EdsUpdate(String clusterName, Map<Locality, LocalityLbEndpoints> localityLbEndpoints, List<DropOverload> dropPolicies)209 EdsUpdate(String clusterName, Map<Locality, LocalityLbEndpoints> localityLbEndpoints, 210 List<DropOverload> dropPolicies) { 211 this.clusterName = checkNotNull(clusterName, "clusterName"); 212 this.localityLbEndpointsMap = Collections.unmodifiableMap( 213 new LinkedHashMap<>(checkNotNull(localityLbEndpoints, "localityLbEndpoints"))); 214 this.dropPolicies = Collections.unmodifiableList( 215 new ArrayList<>(checkNotNull(dropPolicies, "dropPolicies"))); 216 } 217 218 @Override equals(Object o)219 public boolean equals(Object o) { 220 if (this == o) { 221 return true; 222 } 223 if (o == null || getClass() != o.getClass()) { 224 return false; 225 } 226 EdsUpdate that = (EdsUpdate) o; 227 return Objects.equals(clusterName, that.clusterName) 228 && Objects.equals(localityLbEndpointsMap, that.localityLbEndpointsMap) 229 && Objects.equals(dropPolicies, that.dropPolicies); 230 } 231 232 @Override hashCode()233 public int hashCode() { 234 return Objects.hash(clusterName, localityLbEndpointsMap, dropPolicies); 235 } 236 237 @Override toString()238 public String toString() { 239 return 240 MoreObjects 241 .toStringHelper(this) 242 .add("clusterName", clusterName) 243 .add("localityLbEndpointsMap", localityLbEndpointsMap) 244 .add("dropPolicies", dropPolicies) 245 .toString(); 246 } 247 } 248 } 249