1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H
18 #define GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include <map>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/string_view.h"
33 #include "upb/reflection/def.hpp"
34 
35 #include <grpc/event_engine/event_engine.h>
36 
37 #include "src/core/ext/xds/xds_api.h"
38 #include "src/core/ext/xds/xds_bootstrap.h"
39 #include "src/core/ext/xds/xds_client_stats.h"
40 #include "src/core/ext/xds/xds_resource_type.h"
41 #include "src/core/ext/xds/xds_transport.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/gprpp/dual_ref_counted.h"
44 #include "src/core/lib/gprpp/orphanable.h"
45 #include "src/core/lib/gprpp/ref_counted.h"
46 #include "src/core/lib/gprpp/ref_counted_ptr.h"
47 #include "src/core/lib/gprpp/sync.h"
48 #include "src/core/lib/gprpp/time.h"
49 #include "src/core/lib/gprpp/work_serializer.h"
50 #include "src/core/lib/uri/uri_parser.h"
51 
52 namespace grpc_core {
53 
54 extern TraceFlag grpc_xds_client_trace;
55 extern TraceFlag grpc_xds_client_refcount_trace;
56 
57 class XdsClient : public DualRefCounted<XdsClient> {
58  public:
59   // Resource watcher interface.  Implemented by callers.
60   // Note: Most callers will not use this API directly but rather via a
61   // resource-type-specific wrapper API provided by the relevant
62   // XdsResourceType implementation.
63   class ResourceWatcherInterface : public RefCounted<ResourceWatcherInterface> {
64    public:
65     virtual void OnGenericResourceChanged(
66         const XdsResourceType::ResourceData* resource)
67         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
68     virtual void OnError(absl::Status status)
69         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
70     virtual void OnResourceDoesNotExist()
71         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
72   };
73 
74   XdsClient(
75       std::unique_ptr<XdsBootstrap> bootstrap,
76       OrphanablePtr<XdsTransportFactory> transport_factory,
77       std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
78       std::string user_agent_name, std::string user_agent_version,
79       Duration resource_request_timeout = Duration::Seconds(15));
80   ~XdsClient() override;
81 
bootstrap()82   const XdsBootstrap& bootstrap() const {
83     return *bootstrap_;  // ctor asserts that it is non-null
84   }
85 
transport_factory()86   XdsTransportFactory* transport_factory() const {
87     return transport_factory_.get();
88   }
89 
90   void Orphan() override;
91 
92   // Start and cancel watch for a resource.
93   //
94   // The XdsClient takes ownership of the watcher, but the caller may
95   // keep a raw pointer to the watcher, which may be used only for
96   // cancellation.  (Because the caller does not own the watcher, the
97   // pointer must not be used for any other purpose.)
98   // If the caller is going to start a new watch after cancelling the
99   // old one, it should set delay_unsubscription to true.
100   //
101   // The resource type object must be a global singleton, since the first
102   // time the XdsClient sees a particular resource type object, it will
103   // store the pointer to that object as the authoritative implementation for
104   // its type URLs.  The resource type object must outlive the XdsClient object,
105   // and it is illegal to start a subsequent watch for the same type URLs using
106   // a different resource type object.
107   //
108   // Note: Most callers will not use this API directly but rather via a
109   // resource-type-specific wrapper API provided by the relevant
110   // XdsResourceType implementation.
111   void WatchResource(const XdsResourceType* type, absl::string_view name,
112                      RefCountedPtr<ResourceWatcherInterface> watcher);
113   void CancelResourceWatch(const XdsResourceType* type,
114                            absl::string_view listener_name,
115                            ResourceWatcherInterface* watcher,
116                            bool delay_unsubscription = false);
117 
118   // Adds and removes drop stats for cluster_name and eds_service_name.
119   RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
120       const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
121       absl::string_view eds_service_name);
122   void RemoveClusterDropStats(const XdsBootstrap::XdsServer& xds_server,
123                               absl::string_view cluster_name,
124                               absl::string_view eds_service_name,
125                               XdsClusterDropStats* cluster_drop_stats);
126 
127   // Adds and removes locality stats for cluster_name and eds_service_name
128   // for the specified locality.
129   RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
130       const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
131       absl::string_view eds_service_name,
132       RefCountedPtr<XdsLocalityName> locality);
133   void RemoveClusterLocalityStats(
134       const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
135       absl::string_view eds_service_name,
136       const RefCountedPtr<XdsLocalityName>& locality,
137       XdsClusterLocalityStats* cluster_locality_stats);
138 
139   // Resets connection backoff state.
140   void ResetBackoff();
141 
142   // Dumps the active xDS config in JSON format.
143   // Individual xDS resource is encoded as envoy.admin.v3.*ConfigDump. Returns
144   // envoy.service.status.v3.ClientConfig which also includes the config
145   // status (e.g., CLIENT_REQUESTED, CLIENT_ACKED, CLIENT_NACKED).
146   //
147   // Expected to be invoked by wrapper languages in their CSDS service
148   // implementation.
149   std::string DumpClientConfigBinary();
150 
engine()151   grpc_event_engine::experimental::EventEngine* engine() {
152     return engine_.get();
153   }
154 
155  private:
156   struct XdsResourceKey {
157     std::string id;
158     std::vector<URI::QueryParam> query_params;
159 
160     bool operator<(const XdsResourceKey& other) const {
161       int c = id.compare(other.id);
162       if (c != 0) return c < 0;
163       return query_params < other.query_params;
164     }
165   };
166 
167   struct XdsResourceName {
168     std::string authority;
169     XdsResourceKey key;
170   };
171 
172   // Contains a channel to the xds server and all the data related to the
173   // channel.  Holds a ref to the xds client object.
174   class ChannelState : public DualRefCounted<ChannelState> {
175    public:
176     template <typename T>
177     class RetryableCall;
178 
179     class AdsCallState;
180     class LrsCallState;
181 
182     ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
183                  const XdsBootstrap::XdsServer& server);
184     ~ChannelState() override;
185 
186     void Orphan() override;
187 
xds_client()188     XdsClient* xds_client() const { return xds_client_.get(); }
189     AdsCallState* ads_calld() const;
190     LrsCallState* lrs_calld() const;
191 
192     void ResetBackoff();
193 
194     void MaybeStartLrsCall();
195     void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
196 
197     // Returns non-OK if there has been an error since the last time the
198     // ADS stream saw a response.
status()199     const absl::Status& status() const { return status_; }
200 
201     void SubscribeLocked(const XdsResourceType* type,
202                          const XdsResourceName& name)
203         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
204     void UnsubscribeLocked(const XdsResourceType* type,
205                            const XdsResourceName& name,
206                            bool delay_unsubscription)
207         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
208 
209    private:
210     void OnConnectivityFailure(absl::Status status);
211 
212     // Enqueues error notifications to watchers.  Caller must drain
213     // XdsClient::work_serializer_ after releasing the lock.
214     void SetChannelStatusLocked(absl::Status status)
215         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
216 
217     // The owning xds client.
218     WeakRefCountedPtr<XdsClient> xds_client_;
219 
220     const XdsBootstrap::XdsServer& server_;  // Owned by bootstrap.
221 
222     OrphanablePtr<XdsTransportFactory::XdsTransport> transport_;
223 
224     bool shutting_down_ = false;
225 
226     // The retryable XDS calls.
227     OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
228     OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
229 
230     // Stores the most recent accepted resource version for each resource type.
231     std::map<const XdsResourceType*, std::string /*version*/>
232         resource_type_version_map_;
233 
234     absl::Status status_;
235   };
236 
237   struct ResourceState {
238     std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
239         watchers;
240     // The latest data seen for the resource.
241     std::unique_ptr<XdsResourceType::ResourceData> resource;
242     XdsApi::ResourceMetadata meta;
243     bool ignored_deletion = false;
244   };
245 
246   struct AuthorityState {
247     RefCountedPtr<ChannelState> channel_state;
248     std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>>
249         resource_map;
250   };
251 
252   struct LoadReportState {
253     struct LocalityState {
254       XdsClusterLocalityStats* locality_stats = nullptr;
255       XdsClusterLocalityStats::Snapshot deleted_locality_stats;
256     };
257 
258     XdsClusterDropStats* drop_stats = nullptr;
259     XdsClusterDropStats::Snapshot deleted_drop_stats;
260     std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
261              XdsLocalityName::Less>
262         locality_stats;
263     Timestamp last_report_time = Timestamp::Now();
264   };
265 
266   // Load report data.
267   using LoadReportMap = std::map<
268       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
269       LoadReportState>;
270 
271   struct LoadReportServer {
272     RefCountedPtr<ChannelState> channel_state;
273     LoadReportMap load_report_map;
274   };
275 
276   // Sends an error notification to a specific set of watchers.
277   void NotifyWatchersOnErrorLocked(
278       const std::map<ResourceWatcherInterface*,
279                      RefCountedPtr<ResourceWatcherInterface>>& watchers,
280       absl::Status status);
281   // Sends a resource-does-not-exist notification to a specific set of watchers.
282   void NotifyWatchersOnResourceDoesNotExist(
283       const std::map<ResourceWatcherInterface*,
284                      RefCountedPtr<ResourceWatcherInterface>>& watchers);
285 
286   void MaybeRegisterResourceTypeLocked(const XdsResourceType* resource_type)
287       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
288 
289   // Gets the type for resource_type, or null if the type is unknown.
290   const XdsResourceType* GetResourceTypeLocked(absl::string_view resource_type)
291       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
292 
293   absl::StatusOr<XdsResourceName> ParseXdsResourceName(
294       absl::string_view name, const XdsResourceType* type);
295   static std::string ConstructFullXdsResourceName(
296       absl::string_view authority, absl::string_view resource_type,
297       const XdsResourceKey& key);
298 
299   XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
300       const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
301       const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
302 
303   RefCountedPtr<ChannelState> GetOrCreateChannelStateLocked(
304       const XdsBootstrap::XdsServer& server, const char* reason)
305       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
306 
307   std::unique_ptr<XdsBootstrap> bootstrap_;
308   OrphanablePtr<XdsTransportFactory> transport_factory_;
309   const Duration request_timeout_;
310   const bool xds_federation_enabled_;
311   XdsApi api_;
312   WorkSerializer work_serializer_;
313   std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
314 
315   Mutex mu_;
316 
317   // Stores resource type objects seen by type URL.
318   std::map<absl::string_view /*resource_type*/, const XdsResourceType*>
319       resource_types_ ABSL_GUARDED_BY(mu_);
320   upb::SymbolTable symtab_ ABSL_GUARDED_BY(mu_);
321 
322   // Map of existing xDS server channels.
323   // Key is owned by the bootstrap config.
324   std::map<const XdsBootstrap::XdsServer*, ChannelState*>
325       xds_server_channel_map_ ABSL_GUARDED_BY(mu_);
326 
327   std::map<std::string /*authority*/, AuthorityState> authority_state_map_
328       ABSL_GUARDED_BY(mu_);
329 
330   // Key is owned by the bootstrap config.
331   std::map<const XdsBootstrap::XdsServer*, LoadReportServer>
332       xds_load_report_server_map_ ABSL_GUARDED_BY(mu_);
333 
334   // Stores started watchers whose resource name was not parsed successfully,
335   // waiting to be cancelled or reset in Orphan().
336   std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
337       invalid_watchers_ ABSL_GUARDED_BY(mu_);
338 
339   bool shutting_down_ ABSL_GUARDED_BY(mu_) = false;
340 };
341 
342 }  // namespace grpc_core
343 
344 #endif  // GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H
345