1 /* 2 * Copyright 2019 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.base.Joiner; 24 import com.google.common.base.Splitter; 25 import com.google.common.net.UrlEscapers; 26 import com.google.common.util.concurrent.ListenableFuture; 27 import com.google.protobuf.Any; 28 import io.grpc.Status; 29 import io.grpc.xds.Bootstrapper.ServerInfo; 30 import io.grpc.xds.LoadStatsManager2.ClusterDropStats; 31 import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats; 32 import java.net.URI; 33 import java.net.URISyntaxException; 34 import java.util.ArrayList; 35 import java.util.Collection; 36 import java.util.Collections; 37 import java.util.List; 38 import java.util.Map; 39 import javax.annotation.Nullable; 40 41 /** 42 * An {@link XdsClient} instance encapsulates all of the logic for communicating with the xDS 43 * server. It may create multiple RPC streams (or a single ADS stream) for a series of xDS 44 * protocols (e.g., LDS, RDS, VHDS, CDS and EDS) over a single channel. Watch-based interfaces 45 * are provided for each set of data needed by gRPC. 46 */ 47 abstract class XdsClient { 48 isResourceNameValid(String resourceName, String typeUrl)49 static boolean isResourceNameValid(String resourceName, String typeUrl) { 50 checkNotNull(resourceName, "resourceName"); 51 if (!resourceName.startsWith(XDSTP_SCHEME)) { 52 return true; 53 } 54 URI uri; 55 try { 56 uri = new URI(resourceName); 57 } catch (URISyntaxException e) { 58 return false; 59 } 60 String path = uri.getPath(); 61 // path must be in the form of /{resource type}/{id/*} 62 Splitter slashSplitter = Splitter.on('/').omitEmptyStrings(); 63 if (path == null) { 64 return false; 65 } 66 List<String> pathSegs = slashSplitter.splitToList(path); 67 if (pathSegs.size() < 2) { 68 return false; 69 } 70 String type = pathSegs.get(0); 71 if (!type.equals(slashSplitter.splitToList(typeUrl).get(1))) { 72 return false; 73 } 74 return true; 75 } 76 canonifyResourceName(String resourceName)77 static String canonifyResourceName(String resourceName) { 78 checkNotNull(resourceName, "resourceName"); 79 if (!resourceName.startsWith(XDSTP_SCHEME)) { 80 return resourceName; 81 } 82 URI uri = URI.create(resourceName); 83 String rawQuery = uri.getRawQuery(); 84 Splitter ampSplitter = Splitter.on('&').omitEmptyStrings(); 85 if (rawQuery == null) { 86 return resourceName; 87 } 88 List<String> queries = ampSplitter.splitToList(rawQuery); 89 if (queries.size() < 2) { 90 return resourceName; 91 } 92 List<String> canonicalContextParams = new ArrayList<>(queries.size()); 93 for (String query : queries) { 94 canonicalContextParams.add(query); 95 } 96 Collections.sort(canonicalContextParams); 97 String canonifiedQuery = Joiner.on('&').join(canonicalContextParams); 98 return resourceName.replace(rawQuery, canonifiedQuery); 99 } 100 percentEncodePath(String input)101 static String percentEncodePath(String input) { 102 Iterable<String> pathSegs = Splitter.on('/').split(input); 103 List<String> encodedSegs = new ArrayList<>(); 104 for (String pathSeg : pathSegs) { 105 encodedSegs.add(UrlEscapers.urlPathSegmentEscaper().escape(pathSeg)); 106 } 107 return Joiner.on('/').join(encodedSegs); 108 } 109 110 interface ResourceUpdate { 111 } 112 113 /** 114 * Watcher interface for a single requested xDS resource. 115 */ 116 interface ResourceWatcher<T extends ResourceUpdate> { 117 118 /** 119 * Called when the resource discovery RPC encounters some transient error. 120 * 121 * <p>Note that we expect that the implementer to: 122 * - Comply with the guarantee to not generate certain statuses by the library: 123 * https://grpc.github.io/grpc/core/md_doc_statuscodes.html. If the code needs to be 124 * propagated to the channel, override it with {@link Status.Code#UNAVAILABLE}. 125 * - Keep {@link Status} description in one form or another, as it contains valuable debugging 126 * information. 127 */ onError(Status error)128 void onError(Status error); 129 130 /** 131 * Called when the requested resource is not available. 132 * 133 * @param resourceName name of the resource requested in discovery request. 134 */ onResourceDoesNotExist(String resourceName)135 void onResourceDoesNotExist(String resourceName); 136 onChanged(T update)137 void onChanged(T update); 138 } 139 140 /** 141 * The metadata of the xDS resource; used by the xDS config dump. 142 */ 143 static final class ResourceMetadata { 144 private final String version; 145 private final ResourceMetadataStatus status; 146 private final long updateTimeNanos; 147 @Nullable private final Any rawResource; 148 @Nullable private final UpdateFailureState errorState; 149 ResourceMetadata( ResourceMetadataStatus status, String version, long updateTimeNanos, @Nullable Any rawResource, @Nullable UpdateFailureState errorState)150 private ResourceMetadata( 151 ResourceMetadataStatus status, String version, long updateTimeNanos, 152 @Nullable Any rawResource, @Nullable UpdateFailureState errorState) { 153 this.status = checkNotNull(status, "status"); 154 this.version = checkNotNull(version, "version"); 155 this.updateTimeNanos = updateTimeNanos; 156 this.rawResource = rawResource; 157 this.errorState = errorState; 158 } 159 newResourceMetadataUnknown()160 static ResourceMetadata newResourceMetadataUnknown() { 161 return new ResourceMetadata(ResourceMetadataStatus.UNKNOWN, "", 0, null, null); 162 } 163 newResourceMetadataRequested()164 static ResourceMetadata newResourceMetadataRequested() { 165 return new ResourceMetadata(ResourceMetadataStatus.REQUESTED, "", 0, null, null); 166 } 167 newResourceMetadataDoesNotExist()168 static ResourceMetadata newResourceMetadataDoesNotExist() { 169 return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, null, null); 170 } 171 newResourceMetadataAcked( Any rawResource, String version, long updateTimeNanos)172 static ResourceMetadata newResourceMetadataAcked( 173 Any rawResource, String version, long updateTimeNanos) { 174 checkNotNull(rawResource, "rawResource"); 175 return new ResourceMetadata( 176 ResourceMetadataStatus.ACKED, version, updateTimeNanos, rawResource, null); 177 } 178 newResourceMetadataNacked( ResourceMetadata metadata, String failedVersion, long failedUpdateTime, String failedDetails)179 static ResourceMetadata newResourceMetadataNacked( 180 ResourceMetadata metadata, String failedVersion, long failedUpdateTime, 181 String failedDetails) { 182 checkNotNull(metadata, "metadata"); 183 return new ResourceMetadata(ResourceMetadataStatus.NACKED, 184 metadata.getVersion(), metadata.getUpdateTimeNanos(), metadata.getRawResource(), 185 new UpdateFailureState(failedVersion, failedUpdateTime, failedDetails)); 186 } 187 188 /** The last successfully updated version of the resource. */ getVersion()189 String getVersion() { 190 return version; 191 } 192 193 /** The client status of this resource. */ getStatus()194 ResourceMetadataStatus getStatus() { 195 return status; 196 } 197 198 /** The timestamp when the resource was last successfully updated. */ getUpdateTimeNanos()199 long getUpdateTimeNanos() { 200 return updateTimeNanos; 201 } 202 203 /** The last successfully updated xDS resource as it was returned by the server. */ 204 @Nullable getRawResource()205 Any getRawResource() { 206 return rawResource; 207 } 208 209 /** The metadata capturing the error details of the last rejected update of the resource. */ 210 @Nullable getErrorState()211 UpdateFailureState getErrorState() { 212 return errorState; 213 } 214 215 /** 216 * Resource status from the view of a xDS client, which tells the synchronization 217 * status between the xDS client and the xDS server. 218 * 219 * <p>This is a native representation of xDS ConfigDump ClientResourceStatus, see 220 * <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/admin/v3/config_dump.proto"> 221 * config_dump.proto</a> 222 */ 223 enum ResourceMetadataStatus { 224 UNKNOWN, REQUESTED, DOES_NOT_EXIST, ACKED, NACKED 225 } 226 227 /** 228 * Captures error metadata of failed resource updates. 229 * 230 * <p>This is a native representation of xDS ConfigDump UpdateFailureState, see 231 * <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/admin/v3/config_dump.proto"> 232 * config_dump.proto</a> 233 */ 234 static final class UpdateFailureState { 235 private final String failedVersion; 236 private final long failedUpdateTimeNanos; 237 private final String failedDetails; 238 UpdateFailureState( String failedVersion, long failedUpdateTimeNanos, String failedDetails)239 private UpdateFailureState( 240 String failedVersion, long failedUpdateTimeNanos, String failedDetails) { 241 this.failedVersion = checkNotNull(failedVersion, "failedVersion"); 242 this.failedUpdateTimeNanos = failedUpdateTimeNanos; 243 this.failedDetails = checkNotNull(failedDetails, "failedDetails"); 244 } 245 246 /** The rejected version string of the last failed update attempt. */ getFailedVersion()247 String getFailedVersion() { 248 return failedVersion; 249 } 250 251 /** Details about the last failed update attempt. */ getFailedUpdateTimeNanos()252 long getFailedUpdateTimeNanos() { 253 return failedUpdateTimeNanos; 254 } 255 256 /** Timestamp of the last failed update attempt. */ getFailedDetails()257 String getFailedDetails() { 258 return failedDetails; 259 } 260 } 261 } 262 263 /** 264 * Shutdown this {@link XdsClient} and release resources. 265 */ shutdown()266 void shutdown() { 267 throw new UnsupportedOperationException(); 268 } 269 270 /** 271 * Returns {@code true} if {@link #shutdown()} has been called. 272 */ isShutDown()273 boolean isShutDown() { 274 throw new UnsupportedOperationException(); 275 } 276 277 /** 278 * Returns the config used to bootstrap this XdsClient {@link Bootstrapper.BootstrapInfo}. 279 */ getBootstrapInfo()280 Bootstrapper.BootstrapInfo getBootstrapInfo() { 281 throw new UnsupportedOperationException(); 282 } 283 284 /** 285 * Returns the {@link TlsContextManager} used in this XdsClient. 286 */ getTlsContextManager()287 TlsContextManager getTlsContextManager() { 288 throw new UnsupportedOperationException(); 289 } 290 291 /** 292 * Returns a {@link ListenableFuture} to the snapshot of the subscribed resources as 293 * they are at the moment of the call. 294 * 295 * <p>The snapshot is a map from the "resource type" to 296 * a map ("resource name": "resource metadata"). 297 */ 298 // Must be synchronized. 299 ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> getSubscribedResourcesMetadataSnapshot()300 getSubscribedResourcesMetadataSnapshot() { 301 throw new UnsupportedOperationException(); 302 } 303 304 /** 305 * Registers a data watcher for the given Xds resource. 306 */ watchXdsResource(XdsResourceType<T> type, String resourceName, ResourceWatcher<T> watcher)307 <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName, 308 ResourceWatcher<T> watcher) { 309 throw new UnsupportedOperationException(); 310 } 311 312 /** 313 * Unregisters the given resource watcher. 314 */ cancelXdsResourceWatch(XdsResourceType<T> type, String resourceName, ResourceWatcher<T> watcher)315 <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type, 316 String resourceName, 317 ResourceWatcher<T> watcher) { 318 throw new UnsupportedOperationException(); 319 } 320 321 /** 322 * Adds drop stats for the specified cluster with edsServiceName by using the returned object 323 * to record dropped requests. Drop stats recorded with the returned object will be reported 324 * to the load reporting server. The returned object is reference counted and the caller should 325 * use {@link ClusterDropStats#release} to release its <i>hard</i> reference when it is safe to 326 * stop reporting dropped RPCs for the specified cluster in the future. 327 */ addClusterDropStats( ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName)328 ClusterDropStats addClusterDropStats( 329 ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName) { 330 throw new UnsupportedOperationException(); 331 } 332 333 /** 334 * Adds load stats for the specified locality (in the specified cluster with edsServiceName) by 335 * using the returned object to record RPCs. Load stats recorded with the returned object will 336 * be reported to the load reporting server. The returned object is reference counted and the 337 * caller should use {@link ClusterLocalityStats#release} to release its <i>hard</i> 338 * reference when it is safe to stop reporting RPC loads for the specified locality in the 339 * future. 340 */ addClusterLocalityStats( ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName, Locality locality)341 ClusterLocalityStats addClusterLocalityStats( 342 ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName, 343 Locality locality) { 344 throw new UnsupportedOperationException(); 345 } 346 347 /** 348 * Returns a map of control plane server info objects to the LoadReportClients that are 349 * responsible for sending load reports to the control plane servers. 350 */ 351 @VisibleForTesting getServerLrsClientMap()352 Map<ServerInfo, LoadReportClient> getServerLrsClientMap() { 353 throw new UnsupportedOperationException(); 354 } 355 356 interface XdsResponseHandler { 357 /** Called when a xds response is received. */ handleResourceResponse( XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce)358 void handleResourceResponse( 359 XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo, 360 List<Any> resources, String nonce); 361 362 /** Called when the ADS stream is closed passively. */ 363 // Must be synchronized. handleStreamClosed(Status error)364 void handleStreamClosed(Status error); 365 366 /** Called when the ADS stream has been recreated. */ 367 // Must be synchronized. handleStreamRestarted(ServerInfo serverInfo)368 void handleStreamRestarted(ServerInfo serverInfo); 369 } 370 371 interface ResourceStore { 372 /** 373 * Returns the collection of resources currently subscribing to or {@code null} if not 374 * subscribing to any resources for the given type. 375 * 376 * <p>Note an empty collection indicates subscribing to resources of the given type with 377 * wildcard mode. 378 */ 379 // Must be synchronized. 380 @Nullable getSubscribedResources(ServerInfo serverInfo, XdsResourceType<? extends ResourceUpdate> type)381 Collection<String> getSubscribedResources(ServerInfo serverInfo, 382 XdsResourceType<? extends ResourceUpdate> type); 383 getSubscribedResourceTypesWithTypeUrl()384 Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl(); 385 } 386 387 interface TimerLaunch { 388 /** 389 * For all subscriber's for the specified server, if the resource hasn't yet been 390 * resolved then start a timer for it. 391 */ startSubscriberTimersIfNeeded(ServerInfo serverInfo)392 void startSubscriberTimersIfNeeded(ServerInfo serverInfo); 393 } 394 } 395