xref: /aosp_15_r20/external/grpc-grpc-java/xds/src/main/java/io/grpc/xds/XdsClient.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2019 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.xds;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Joiner;
24 import com.google.common.base.Splitter;
25 import com.google.common.net.UrlEscapers;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.protobuf.Any;
28 import io.grpc.Status;
29 import io.grpc.xds.Bootstrapper.ServerInfo;
30 import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
31 import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
32 import java.net.URI;
33 import java.net.URISyntaxException;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.Map;
39 import javax.annotation.Nullable;
40 
41 /**
42  * An {@link XdsClient} instance encapsulates all of the logic for communicating with the xDS
43  * server. It may create multiple RPC streams (or a single ADS stream) for a series of xDS
44  * protocols (e.g., LDS, RDS, VHDS, CDS and EDS) over a single channel. Watch-based interfaces
45  * are provided for each set of data needed by gRPC.
46  */
47 abstract class XdsClient {
48 
isResourceNameValid(String resourceName, String typeUrl)49   static boolean isResourceNameValid(String resourceName, String typeUrl) {
50     checkNotNull(resourceName, "resourceName");
51     if (!resourceName.startsWith(XDSTP_SCHEME)) {
52       return true;
53     }
54     URI uri;
55     try {
56       uri = new URI(resourceName);
57     } catch (URISyntaxException e) {
58       return false;
59     }
60     String path = uri.getPath();
61     // path must be in the form of /{resource type}/{id/*}
62     Splitter slashSplitter = Splitter.on('/').omitEmptyStrings();
63     if (path == null) {
64       return false;
65     }
66     List<String> pathSegs = slashSplitter.splitToList(path);
67     if (pathSegs.size() < 2) {
68       return false;
69     }
70     String type = pathSegs.get(0);
71     if (!type.equals(slashSplitter.splitToList(typeUrl).get(1))) {
72       return false;
73     }
74     return true;
75   }
76 
canonifyResourceName(String resourceName)77   static String canonifyResourceName(String resourceName) {
78     checkNotNull(resourceName, "resourceName");
79     if (!resourceName.startsWith(XDSTP_SCHEME)) {
80       return resourceName;
81     }
82     URI uri = URI.create(resourceName);
83     String rawQuery = uri.getRawQuery();
84     Splitter ampSplitter = Splitter.on('&').omitEmptyStrings();
85     if (rawQuery == null) {
86       return resourceName;
87     }
88     List<String> queries = ampSplitter.splitToList(rawQuery);
89     if (queries.size() < 2) {
90       return resourceName;
91     }
92     List<String> canonicalContextParams = new ArrayList<>(queries.size());
93     for (String query : queries) {
94       canonicalContextParams.add(query);
95     }
96     Collections.sort(canonicalContextParams);
97     String canonifiedQuery = Joiner.on('&').join(canonicalContextParams);
98     return resourceName.replace(rawQuery, canonifiedQuery);
99   }
100 
percentEncodePath(String input)101   static String percentEncodePath(String input) {
102     Iterable<String> pathSegs = Splitter.on('/').split(input);
103     List<String> encodedSegs = new ArrayList<>();
104     for (String pathSeg : pathSegs) {
105       encodedSegs.add(UrlEscapers.urlPathSegmentEscaper().escape(pathSeg));
106     }
107     return Joiner.on('/').join(encodedSegs);
108   }
109 
110   interface ResourceUpdate {
111   }
112 
113   /**
114    * Watcher interface for a single requested xDS resource.
115    */
116   interface ResourceWatcher<T extends ResourceUpdate> {
117 
118     /**
119      * Called when the resource discovery RPC encounters some transient error.
120      *
121      * <p>Note that we expect that the implementer to:
122      * - Comply with the guarantee to not generate certain statuses by the library:
123      *   https://grpc.github.io/grpc/core/md_doc_statuscodes.html. If the code needs to be
124      *   propagated to the channel, override it with {@link Status.Code#UNAVAILABLE}.
125      * - Keep {@link Status} description in one form or another, as it contains valuable debugging
126      *   information.
127      */
onError(Status error)128     void onError(Status error);
129 
130     /**
131      * Called when the requested resource is not available.
132      *
133      * @param resourceName name of the resource requested in discovery request.
134      */
onResourceDoesNotExist(String resourceName)135     void onResourceDoesNotExist(String resourceName);
136 
onChanged(T update)137     void onChanged(T update);
138   }
139 
140   /**
141    * The metadata of the xDS resource; used by the xDS config dump.
142    */
143   static final class ResourceMetadata {
144     private final String version;
145     private final ResourceMetadataStatus status;
146     private final long updateTimeNanos;
147     @Nullable private final Any rawResource;
148     @Nullable private final UpdateFailureState errorState;
149 
ResourceMetadata( ResourceMetadataStatus status, String version, long updateTimeNanos, @Nullable Any rawResource, @Nullable UpdateFailureState errorState)150     private ResourceMetadata(
151         ResourceMetadataStatus status, String version, long updateTimeNanos,
152         @Nullable Any rawResource, @Nullable UpdateFailureState errorState) {
153       this.status = checkNotNull(status, "status");
154       this.version = checkNotNull(version, "version");
155       this.updateTimeNanos = updateTimeNanos;
156       this.rawResource = rawResource;
157       this.errorState = errorState;
158     }
159 
newResourceMetadataUnknown()160     static ResourceMetadata newResourceMetadataUnknown() {
161       return new ResourceMetadata(ResourceMetadataStatus.UNKNOWN, "", 0, null, null);
162     }
163 
newResourceMetadataRequested()164     static ResourceMetadata newResourceMetadataRequested() {
165       return new ResourceMetadata(ResourceMetadataStatus.REQUESTED, "", 0, null, null);
166     }
167 
newResourceMetadataDoesNotExist()168     static ResourceMetadata newResourceMetadataDoesNotExist() {
169       return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, null, null);
170     }
171 
newResourceMetadataAcked( Any rawResource, String version, long updateTimeNanos)172     static ResourceMetadata newResourceMetadataAcked(
173         Any rawResource, String version, long updateTimeNanos) {
174       checkNotNull(rawResource, "rawResource");
175       return new ResourceMetadata(
176           ResourceMetadataStatus.ACKED, version, updateTimeNanos, rawResource, null);
177     }
178 
newResourceMetadataNacked( ResourceMetadata metadata, String failedVersion, long failedUpdateTime, String failedDetails)179     static ResourceMetadata newResourceMetadataNacked(
180         ResourceMetadata metadata, String failedVersion, long failedUpdateTime,
181         String failedDetails) {
182       checkNotNull(metadata, "metadata");
183       return new ResourceMetadata(ResourceMetadataStatus.NACKED,
184           metadata.getVersion(), metadata.getUpdateTimeNanos(), metadata.getRawResource(),
185           new UpdateFailureState(failedVersion, failedUpdateTime, failedDetails));
186     }
187 
188     /** The last successfully updated version of the resource. */
getVersion()189     String getVersion() {
190       return version;
191     }
192 
193     /** The client status of this resource. */
getStatus()194     ResourceMetadataStatus getStatus() {
195       return status;
196     }
197 
198     /** The timestamp when the resource was last successfully updated. */
getUpdateTimeNanos()199     long getUpdateTimeNanos() {
200       return updateTimeNanos;
201     }
202 
203     /** The last successfully updated xDS resource as it was returned by the server. */
204     @Nullable
getRawResource()205     Any getRawResource() {
206       return rawResource;
207     }
208 
209     /** The metadata capturing the error details of the last rejected update of the resource. */
210     @Nullable
getErrorState()211     UpdateFailureState getErrorState() {
212       return errorState;
213     }
214 
215     /**
216      * Resource status from the view of a xDS client, which tells the synchronization
217      * status between the xDS client and the xDS server.
218      *
219      * <p>This is a native representation of xDS ConfigDump ClientResourceStatus, see
220      * <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/admin/v3/config_dump.proto">
221      * config_dump.proto</a>
222      */
223     enum ResourceMetadataStatus {
224       UNKNOWN, REQUESTED, DOES_NOT_EXIST, ACKED, NACKED
225     }
226 
227     /**
228      * Captures error metadata of failed resource updates.
229      *
230      * <p>This is a native representation of xDS ConfigDump UpdateFailureState, see
231      * <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/admin/v3/config_dump.proto">
232      * config_dump.proto</a>
233      */
234     static final class UpdateFailureState {
235       private final String failedVersion;
236       private final long failedUpdateTimeNanos;
237       private final String failedDetails;
238 
UpdateFailureState( String failedVersion, long failedUpdateTimeNanos, String failedDetails)239       private UpdateFailureState(
240           String failedVersion, long failedUpdateTimeNanos, String failedDetails) {
241         this.failedVersion = checkNotNull(failedVersion, "failedVersion");
242         this.failedUpdateTimeNanos = failedUpdateTimeNanos;
243         this.failedDetails = checkNotNull(failedDetails, "failedDetails");
244       }
245 
246       /** The rejected version string of the last failed update attempt. */
getFailedVersion()247       String getFailedVersion() {
248         return failedVersion;
249       }
250 
251       /** Details about the last failed update attempt. */
getFailedUpdateTimeNanos()252       long getFailedUpdateTimeNanos() {
253         return failedUpdateTimeNanos;
254       }
255 
256       /** Timestamp of the last failed update attempt. */
getFailedDetails()257       String getFailedDetails() {
258         return failedDetails;
259       }
260     }
261   }
262 
263   /**
264    * Shutdown this {@link XdsClient} and release resources.
265    */
shutdown()266   void shutdown() {
267     throw new UnsupportedOperationException();
268   }
269 
270   /**
271    * Returns {@code true} if {@link #shutdown()} has been called.
272    */
isShutDown()273   boolean isShutDown() {
274     throw new UnsupportedOperationException();
275   }
276 
277   /**
278    * Returns the config used to bootstrap this XdsClient {@link Bootstrapper.BootstrapInfo}.
279    */
getBootstrapInfo()280   Bootstrapper.BootstrapInfo getBootstrapInfo() {
281     throw new UnsupportedOperationException();
282   }
283 
284   /**
285    * Returns the {@link TlsContextManager} used in this XdsClient.
286    */
getTlsContextManager()287   TlsContextManager getTlsContextManager() {
288     throw new UnsupportedOperationException();
289   }
290 
291   /**
292    * Returns a {@link ListenableFuture} to the snapshot of the subscribed resources as
293    * they are at the moment of the call.
294    *
295    * <p>The snapshot is a map from the "resource type" to
296    * a map ("resource name": "resource metadata").
297    */
298   // Must be synchronized.
299   ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getSubscribedResourcesMetadataSnapshot()300       getSubscribedResourcesMetadataSnapshot() {
301     throw new UnsupportedOperationException();
302   }
303 
304   /**
305    * Registers a data watcher for the given Xds resource.
306    */
watchXdsResource(XdsResourceType<T> type, String resourceName, ResourceWatcher<T> watcher)307   <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
308                                                    ResourceWatcher<T> watcher) {
309     throw new UnsupportedOperationException();
310   }
311 
312   /**
313    * Unregisters the given resource watcher.
314    */
cancelXdsResourceWatch(XdsResourceType<T> type, String resourceName, ResourceWatcher<T> watcher)315   <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
316                                                          String resourceName,
317                                                          ResourceWatcher<T> watcher) {
318     throw new UnsupportedOperationException();
319   }
320 
321   /**
322    * Adds drop stats for the specified cluster with edsServiceName by using the returned object
323    * to record dropped requests. Drop stats recorded with the returned object will be reported
324    * to the load reporting server. The returned object is reference counted and the caller should
325    * use {@link ClusterDropStats#release} to release its <i>hard</i> reference when it is safe to
326    * stop reporting dropped RPCs for the specified cluster in the future.
327    */
addClusterDropStats( ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName)328   ClusterDropStats addClusterDropStats(
329       ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName) {
330     throw new UnsupportedOperationException();
331   }
332 
333   /**
334    * Adds load stats for the specified locality (in the specified cluster with edsServiceName) by
335    * using the returned object to record RPCs. Load stats recorded with the returned object will
336    * be reported to the load reporting server. The returned object is reference counted and the
337    * caller should use {@link ClusterLocalityStats#release} to release its <i>hard</i>
338    * reference when it is safe to stop reporting RPC loads for the specified locality in the
339    * future.
340    */
addClusterLocalityStats( ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName, Locality locality)341   ClusterLocalityStats addClusterLocalityStats(
342       ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
343       Locality locality) {
344     throw new UnsupportedOperationException();
345   }
346 
347   /**
348    * Returns a map of control plane server info objects to the LoadReportClients that are
349    * responsible for sending load reports to the control plane servers.
350    */
351   @VisibleForTesting
getServerLrsClientMap()352   Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
353     throw new UnsupportedOperationException();
354   }
355 
356   interface XdsResponseHandler {
357     /** Called when a xds response is received. */
handleResourceResponse( XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce)358     void handleResourceResponse(
359         XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo,
360         List<Any> resources, String nonce);
361 
362     /** Called when the ADS stream is closed passively. */
363     // Must be synchronized.
handleStreamClosed(Status error)364     void handleStreamClosed(Status error);
365 
366     /** Called when the ADS stream has been recreated. */
367     // Must be synchronized.
handleStreamRestarted(ServerInfo serverInfo)368     void handleStreamRestarted(ServerInfo serverInfo);
369   }
370 
371   interface ResourceStore {
372     /**
373      * Returns the collection of resources currently subscribing to or {@code null} if not
374      * subscribing to any resources for the given type.
375      *
376      * <p>Note an empty collection indicates subscribing to resources of the given type with
377      * wildcard mode.
378      */
379     // Must be synchronized.
380     @Nullable
getSubscribedResources(ServerInfo serverInfo, XdsResourceType<? extends ResourceUpdate> type)381     Collection<String> getSubscribedResources(ServerInfo serverInfo,
382                                               XdsResourceType<? extends ResourceUpdate> type);
383 
getSubscribedResourceTypesWithTypeUrl()384     Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl();
385   }
386 
387   interface TimerLaunch {
388     /**
389      * For all subscriber's for the specified server, if the resource hasn't yet been
390      * resolved then start a timer for it.
391      */
startSubscriberTimersIfNeeded(ServerInfo serverInfo)392     void startSubscriberTimersIfNeeded(ServerInfo serverInfo);
393   }
394 }
395