1 // 2 // Copyright 2019 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H 18 #define GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H 19 20 #include <grpc/support/port_platform.h> 21 22 #include <map> 23 #include <memory> 24 #include <set> 25 #include <string> 26 #include <utility> 27 #include <vector> 28 29 #include "absl/base/thread_annotations.h" 30 #include "absl/status/status.h" 31 #include "absl/status/statusor.h" 32 #include "absl/strings/string_view.h" 33 #include "upb/reflection/def.hpp" 34 35 #include <grpc/event_engine/event_engine.h> 36 37 #include "src/core/ext/xds/xds_api.h" 38 #include "src/core/ext/xds/xds_bootstrap.h" 39 #include "src/core/ext/xds/xds_client_stats.h" 40 #include "src/core/ext/xds/xds_resource_type.h" 41 #include "src/core/ext/xds/xds_transport.h" 42 #include "src/core/lib/debug/trace.h" 43 #include "src/core/lib/gprpp/dual_ref_counted.h" 44 #include "src/core/lib/gprpp/orphanable.h" 45 #include "src/core/lib/gprpp/ref_counted.h" 46 #include "src/core/lib/gprpp/ref_counted_ptr.h" 47 #include "src/core/lib/gprpp/sync.h" 48 #include "src/core/lib/gprpp/time.h" 49 #include "src/core/lib/gprpp/work_serializer.h" 50 #include "src/core/lib/uri/uri_parser.h" 51 52 namespace grpc_core { 53 54 extern TraceFlag grpc_xds_client_trace; 55 extern TraceFlag grpc_xds_client_refcount_trace; 56 57 class XdsClient : public DualRefCounted<XdsClient> { 58 public: 59 // Resource watcher interface. Implemented by callers. 60 // Note: Most callers will not use this API directly but rather via a 61 // resource-type-specific wrapper API provided by the relevant 62 // XdsResourceType implementation. 63 class ResourceWatcherInterface : public RefCounted<ResourceWatcherInterface> { 64 public: 65 virtual void OnGenericResourceChanged( 66 const XdsResourceType::ResourceData* resource) 67 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; 68 virtual void OnError(absl::Status status) 69 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; 70 virtual void OnResourceDoesNotExist() 71 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; 72 }; 73 74 XdsClient( 75 std::unique_ptr<XdsBootstrap> bootstrap, 76 OrphanablePtr<XdsTransportFactory> transport_factory, 77 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine, 78 std::string user_agent_name, std::string user_agent_version, 79 Duration resource_request_timeout = Duration::Seconds(15)); 80 ~XdsClient() override; 81 bootstrap()82 const XdsBootstrap& bootstrap() const { 83 return *bootstrap_; // ctor asserts that it is non-null 84 } 85 transport_factory()86 XdsTransportFactory* transport_factory() const { 87 return transport_factory_.get(); 88 } 89 90 void Orphan() override; 91 92 // Start and cancel watch for a resource. 93 // 94 // The XdsClient takes ownership of the watcher, but the caller may 95 // keep a raw pointer to the watcher, which may be used only for 96 // cancellation. (Because the caller does not own the watcher, the 97 // pointer must not be used for any other purpose.) 98 // If the caller is going to start a new watch after cancelling the 99 // old one, it should set delay_unsubscription to true. 100 // 101 // The resource type object must be a global singleton, since the first 102 // time the XdsClient sees a particular resource type object, it will 103 // store the pointer to that object as the authoritative implementation for 104 // its type URLs. The resource type object must outlive the XdsClient object, 105 // and it is illegal to start a subsequent watch for the same type URLs using 106 // a different resource type object. 107 // 108 // Note: Most callers will not use this API directly but rather via a 109 // resource-type-specific wrapper API provided by the relevant 110 // XdsResourceType implementation. 111 void WatchResource(const XdsResourceType* type, absl::string_view name, 112 RefCountedPtr<ResourceWatcherInterface> watcher); 113 void CancelResourceWatch(const XdsResourceType* type, 114 absl::string_view listener_name, 115 ResourceWatcherInterface* watcher, 116 bool delay_unsubscription = false); 117 118 // Adds and removes drop stats for cluster_name and eds_service_name. 119 RefCountedPtr<XdsClusterDropStats> AddClusterDropStats( 120 const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, 121 absl::string_view eds_service_name); 122 void RemoveClusterDropStats(const XdsBootstrap::XdsServer& xds_server, 123 absl::string_view cluster_name, 124 absl::string_view eds_service_name, 125 XdsClusterDropStats* cluster_drop_stats); 126 127 // Adds and removes locality stats for cluster_name and eds_service_name 128 // for the specified locality. 129 RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats( 130 const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, 131 absl::string_view eds_service_name, 132 RefCountedPtr<XdsLocalityName> locality); 133 void RemoveClusterLocalityStats( 134 const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, 135 absl::string_view eds_service_name, 136 const RefCountedPtr<XdsLocalityName>& locality, 137 XdsClusterLocalityStats* cluster_locality_stats); 138 139 // Resets connection backoff state. 140 void ResetBackoff(); 141 142 // Dumps the active xDS config in JSON format. 143 // Individual xDS resource is encoded as envoy.admin.v3.*ConfigDump. Returns 144 // envoy.service.status.v3.ClientConfig which also includes the config 145 // status (e.g., CLIENT_REQUESTED, CLIENT_ACKED, CLIENT_NACKED). 146 // 147 // Expected to be invoked by wrapper languages in their CSDS service 148 // implementation. 149 std::string DumpClientConfigBinary(); 150 engine()151 grpc_event_engine::experimental::EventEngine* engine() { 152 return engine_.get(); 153 } 154 155 private: 156 struct XdsResourceKey { 157 std::string id; 158 std::vector<URI::QueryParam> query_params; 159 160 bool operator<(const XdsResourceKey& other) const { 161 int c = id.compare(other.id); 162 if (c != 0) return c < 0; 163 return query_params < other.query_params; 164 } 165 }; 166 167 struct XdsResourceName { 168 std::string authority; 169 XdsResourceKey key; 170 }; 171 172 // Contains a channel to the xds server and all the data related to the 173 // channel. Holds a ref to the xds client object. 174 class ChannelState : public DualRefCounted<ChannelState> { 175 public: 176 template <typename T> 177 class RetryableCall; 178 179 class AdsCallState; 180 class LrsCallState; 181 182 ChannelState(WeakRefCountedPtr<XdsClient> xds_client, 183 const XdsBootstrap::XdsServer& server); 184 ~ChannelState() override; 185 186 void Orphan() override; 187 xds_client()188 XdsClient* xds_client() const { return xds_client_.get(); } 189 AdsCallState* ads_calld() const; 190 LrsCallState* lrs_calld() const; 191 192 void ResetBackoff(); 193 194 void MaybeStartLrsCall(); 195 void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 196 197 // Returns non-OK if there has been an error since the last time the 198 // ADS stream saw a response. status()199 const absl::Status& status() const { return status_; } 200 201 void SubscribeLocked(const XdsResourceType* type, 202 const XdsResourceName& name) 203 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 204 void UnsubscribeLocked(const XdsResourceType* type, 205 const XdsResourceName& name, 206 bool delay_unsubscription) 207 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 208 209 private: 210 void OnConnectivityFailure(absl::Status status); 211 212 // Enqueues error notifications to watchers. Caller must drain 213 // XdsClient::work_serializer_ after releasing the lock. 214 void SetChannelStatusLocked(absl::Status status) 215 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 216 217 // The owning xds client. 218 WeakRefCountedPtr<XdsClient> xds_client_; 219 220 const XdsBootstrap::XdsServer& server_; // Owned by bootstrap. 221 222 OrphanablePtr<XdsTransportFactory::XdsTransport> transport_; 223 224 bool shutting_down_ = false; 225 226 // The retryable XDS calls. 227 OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_; 228 OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_; 229 230 // Stores the most recent accepted resource version for each resource type. 231 std::map<const XdsResourceType*, std::string /*version*/> 232 resource_type_version_map_; 233 234 absl::Status status_; 235 }; 236 237 struct ResourceState { 238 std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>> 239 watchers; 240 // The latest data seen for the resource. 241 std::unique_ptr<XdsResourceType::ResourceData> resource; 242 XdsApi::ResourceMetadata meta; 243 bool ignored_deletion = false; 244 }; 245 246 struct AuthorityState { 247 RefCountedPtr<ChannelState> channel_state; 248 std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>> 249 resource_map; 250 }; 251 252 struct LoadReportState { 253 struct LocalityState { 254 XdsClusterLocalityStats* locality_stats = nullptr; 255 XdsClusterLocalityStats::Snapshot deleted_locality_stats; 256 }; 257 258 XdsClusterDropStats* drop_stats = nullptr; 259 XdsClusterDropStats::Snapshot deleted_drop_stats; 260 std::map<RefCountedPtr<XdsLocalityName>, LocalityState, 261 XdsLocalityName::Less> 262 locality_stats; 263 Timestamp last_report_time = Timestamp::Now(); 264 }; 265 266 // Load report data. 267 using LoadReportMap = std::map< 268 std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, 269 LoadReportState>; 270 271 struct LoadReportServer { 272 RefCountedPtr<ChannelState> channel_state; 273 LoadReportMap load_report_map; 274 }; 275 276 // Sends an error notification to a specific set of watchers. 277 void NotifyWatchersOnErrorLocked( 278 const std::map<ResourceWatcherInterface*, 279 RefCountedPtr<ResourceWatcherInterface>>& watchers, 280 absl::Status status); 281 // Sends a resource-does-not-exist notification to a specific set of watchers. 282 void NotifyWatchersOnResourceDoesNotExist( 283 const std::map<ResourceWatcherInterface*, 284 RefCountedPtr<ResourceWatcherInterface>>& watchers); 285 286 void MaybeRegisterResourceTypeLocked(const XdsResourceType* resource_type) 287 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 288 289 // Gets the type for resource_type, or null if the type is unknown. 290 const XdsResourceType* GetResourceTypeLocked(absl::string_view resource_type) 291 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 292 293 absl::StatusOr<XdsResourceName> ParseXdsResourceName( 294 absl::string_view name, const XdsResourceType* type); 295 static std::string ConstructFullXdsResourceName( 296 absl::string_view authority, absl::string_view resource_type, 297 const XdsResourceKey& key); 298 299 XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( 300 const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters, 301 const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 302 303 RefCountedPtr<ChannelState> GetOrCreateChannelStateLocked( 304 const XdsBootstrap::XdsServer& server, const char* reason) 305 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 306 307 std::unique_ptr<XdsBootstrap> bootstrap_; 308 OrphanablePtr<XdsTransportFactory> transport_factory_; 309 const Duration request_timeout_; 310 const bool xds_federation_enabled_; 311 XdsApi api_; 312 WorkSerializer work_serializer_; 313 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_; 314 315 Mutex mu_; 316 317 // Stores resource type objects seen by type URL. 318 std::map<absl::string_view /*resource_type*/, const XdsResourceType*> 319 resource_types_ ABSL_GUARDED_BY(mu_); 320 upb::SymbolTable symtab_ ABSL_GUARDED_BY(mu_); 321 322 // Map of existing xDS server channels. 323 // Key is owned by the bootstrap config. 324 std::map<const XdsBootstrap::XdsServer*, ChannelState*> 325 xds_server_channel_map_ ABSL_GUARDED_BY(mu_); 326 327 std::map<std::string /*authority*/, AuthorityState> authority_state_map_ 328 ABSL_GUARDED_BY(mu_); 329 330 // Key is owned by the bootstrap config. 331 std::map<const XdsBootstrap::XdsServer*, LoadReportServer> 332 xds_load_report_server_map_ ABSL_GUARDED_BY(mu_); 333 334 // Stores started watchers whose resource name was not parsed successfully, 335 // waiting to be cancelled or reset in Orphan(). 336 std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>> 337 invalid_watchers_ ABSL_GUARDED_BY(mu_); 338 339 bool shutting_down_ ABSL_GUARDED_BY(mu_) = false; 340 }; 341 342 } // namespace grpc_core 343 344 #endif // GRPC_SRC_CORE_EXT_XDS_XDS_CLIENT_H 345