1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/xds/xds_client_grpc.h"
20
21 #include <algorithm>
22 #include <cstddef>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/base/thread_annotations.h"
29 #include "absl/status/status.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/types/optional.h"
33 #include "envoy/service/status/v3/csds.upb.h"
34 #include "upb/base/string_view.h"
35
36 #include <grpc/grpc.h>
37 #include <grpc/impl/channel_arg_names.h>
38 #include <grpc/slice.h>
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41 #include <grpc/support/string_util.h>
42
43 #include "src/core/ext/xds/upb_utils.h"
44 #include "src/core/ext/xds/xds_api.h"
45 #include "src/core/ext/xds/xds_bootstrap.h"
46 #include "src/core/ext/xds/xds_bootstrap_grpc.h"
47 #include "src/core/ext/xds/xds_channel_args.h"
48 #include "src/core/ext/xds/xds_client.h"
49 #include "src/core/ext/xds/xds_transport.h"
50 #include "src/core/ext/xds/xds_transport_grpc.h"
51 #include "src/core/lib/channel/channel_args.h"
52 #include "src/core/lib/channel/metrics.h"
53 #include "src/core/lib/debug/trace.h"
54 #include "src/core/lib/event_engine/default_event_engine.h"
55 #include "src/core/lib/gprpp/debug_location.h"
56 #include "src/core/lib/gprpp/env.h"
57 #include "src/core/lib/gprpp/load_file.h"
58 #include "src/core/lib/gprpp/orphanable.h"
59 #include "src/core/lib/gprpp/ref_counted_ptr.h"
60 #include "src/core/lib/gprpp/sync.h"
61 #include "src/core/lib/gprpp/time.h"
62 #include "src/core/lib/iomgr/error.h"
63 #include "src/core/lib/iomgr/exec_ctx.h"
64 #include "src/core/lib/slice/slice.h"
65 #include "src/core/lib/slice/slice_internal.h"
66 #include "src/core/lib/transport/error_utils.h"
67
68 // If gRPC is built with -DGRPC_XDS_USER_AGENT_NAME_SUFFIX="...", that string
69 // will be appended to the user agent name reported to the xDS server.
70 #ifdef GRPC_XDS_USER_AGENT_NAME_SUFFIX
71 #define GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING \
72 " " GRPC_XDS_USER_AGENT_NAME_SUFFIX
73 #else
74 #define GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING ""
75 #endif
76
77 // If gRPC is built with -DGRPC_XDS_USER_AGENT_VERSION_SUFFIX="...", that string
78 // will be appended to the user agent version reported to the xDS server.
79 #ifdef GRPC_XDS_USER_AGENT_VERSION_SUFFIX
80 #define GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING \
81 " " GRPC_XDS_USER_AGENT_VERSION_SUFFIX
82 #else
83 #define GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING ""
84 #endif
85
86 namespace grpc_core {
87
88 namespace {
89
90 // Metric labels.
91 constexpr absl::string_view kMetricLabelXdsServer = "grpc.xds.server";
92 constexpr absl::string_view kMetricLabelXdsAuthority = "grpc.xds.authority";
93 constexpr absl::string_view kMetricLabelXdsResourceType =
94 "grpc.xds.resource_type";
95 constexpr absl::string_view kMetricLabelXdsCacheState = "grpc.xds.cache_state";
96
97 const auto kMetricResourceUpdatesValid =
98 GlobalInstrumentsRegistry::RegisterUInt64Counter(
99 "grpc.xds_client.resource_updates_valid",
100 "EXPERIMENTAL. A counter of resources received that were considered "
101 "valid. The counter will be incremented even for resources that "
102 "have not changed.",
103 "{resource}",
104 {kMetricLabelTarget, kMetricLabelXdsServer,
105 kMetricLabelXdsResourceType},
106 {}, false);
107
108 const auto kMetricResourceUpdatesInvalid =
109 GlobalInstrumentsRegistry::RegisterUInt64Counter(
110 "grpc.xds_client.resource_updates_invalid",
111 "EXPERIMENTAL. A counter of resources received that were considered "
112 "invalid.",
113 "{resource}",
114 {kMetricLabelTarget, kMetricLabelXdsServer,
115 kMetricLabelXdsResourceType},
116 {}, false);
117
118 const auto kMetricConnected =
119 GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
120 "grpc.xds_client.connected",
121 "EXPERIMENTAL. Whether or not the xDS client currently has a "
122 "working ADS stream to the xDS server. For a given server, this "
123 "will be set to 0 when we have a connectivity failure or when the "
124 "ADS stream fails without seeing a response message, as per gRFC "
125 "A57. It will be set to 1 when we receive the first response on "
126 "an ADS stream.",
127 "{bool}", {kMetricLabelTarget, kMetricLabelXdsServer}, {}, false);
128
129 const auto kMetricResources =
130 GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
131 "grpc.xds_client.resources", "EXPERIMENTAL. Number of xDS resources.",
132 "{resource}",
133 {kMetricLabelTarget, kMetricLabelXdsAuthority,
134 kMetricLabelXdsResourceType, kMetricLabelXdsCacheState},
135 {}, false);
136
137 } // namespace
138
139 //
140 // GrpcXdsClient::MetricsReporter
141 //
142
143 class GrpcXdsClient::MetricsReporter final : public XdsMetricsReporter {
144 public:
MetricsReporter(GrpcXdsClient & xds_client)145 explicit MetricsReporter(GrpcXdsClient& xds_client)
146 : xds_client_(xds_client) {}
147
ReportResourceUpdates(absl::string_view xds_server,absl::string_view resource_type,uint64_t num_valid_resources,uint64_t num_invalid_resources)148 void ReportResourceUpdates(absl::string_view xds_server,
149 absl::string_view resource_type,
150 uint64_t num_valid_resources,
151 uint64_t num_invalid_resources) override {
152 xds_client_.stats_plugin_group_.AddCounter(
153 kMetricResourceUpdatesValid, num_valid_resources,
154 {xds_client_.key_, xds_server, resource_type}, {});
155 xds_client_.stats_plugin_group_.AddCounter(
156 kMetricResourceUpdatesInvalid, num_invalid_resources,
157 {xds_client_.key_, xds_server, resource_type}, {});
158 }
159
160 private:
161 GrpcXdsClient& xds_client_;
162 };
163
164 //
165 // GrpcXdsClient
166 //
167
168 constexpr absl::string_view GrpcXdsClient::kServerKey;
169
170 namespace {
171
172 Mutex* g_mu = new Mutex;
173 const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
174 // Key bytes live in clients so they outlive the entries in this map
175 NoDestruct<std::map<absl::string_view, GrpcXdsClient*>> g_xds_client_map
176 ABSL_GUARDED_BY(*g_mu);
177 char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
178
GetBootstrapContents(const char * fallback_config)179 absl::StatusOr<std::string> GetBootstrapContents(const char* fallback_config) {
180 // First, try GRPC_XDS_BOOTSTRAP env var.
181 auto path = GetEnv("GRPC_XDS_BOOTSTRAP");
182 if (path.has_value()) {
183 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
184 gpr_log(GPR_INFO,
185 "Got bootstrap file location from GRPC_XDS_BOOTSTRAP "
186 "environment variable: %s",
187 path->c_str());
188 }
189 auto contents = LoadFile(*path, /*add_null_terminator=*/true);
190 if (!contents.ok()) return contents.status();
191 return std::string(contents->as_string_view());
192 }
193 // Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
194 auto env_config = GetEnv("GRPC_XDS_BOOTSTRAP_CONFIG");
195 if (env_config.has_value()) {
196 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
197 gpr_log(GPR_INFO,
198 "Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG "
199 "environment variable");
200 }
201 return std::move(*env_config);
202 }
203 // Finally, try fallback config.
204 if (fallback_config != nullptr) {
205 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
206 gpr_log(GPR_INFO, "Got bootstrap contents from fallback config");
207 }
208 return fallback_config;
209 }
210 // No bootstrap config found.
211 return absl::FailedPreconditionError(
212 "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
213 "not defined");
214 }
215
216 } // namespace
217
GetOrCreate(absl::string_view key,const ChannelArgs & args,const char * reason)218 absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GrpcXdsClient::GetOrCreate(
219 absl::string_view key, const ChannelArgs& args, const char* reason) {
220 // If getting bootstrap from channel args, create a local XdsClient
221 // instance for the channel or server instead of using the global instance.
222 absl::optional<absl::string_view> bootstrap_config = args.GetString(
223 GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG);
224 if (bootstrap_config.has_value()) {
225 auto bootstrap = GrpcXdsBootstrap::Create(*bootstrap_config);
226 if (!bootstrap.ok()) return bootstrap.status();
227 grpc_channel_args* xds_channel_args = args.GetPointer<grpc_channel_args>(
228 GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
229 auto channel_args = ChannelArgs::FromC(xds_channel_args);
230 return MakeRefCounted<GrpcXdsClient>(
231 key, std::move(*bootstrap), channel_args,
232 MakeOrphanable<GrpcXdsTransportFactory>(channel_args));
233 }
234 // Otherwise, use the global instance.
235 MutexLock lock(g_mu);
236 auto it = g_xds_client_map->find(key);
237 if (it != g_xds_client_map->end()) {
238 auto xds_client = it->second->RefIfNonZero(DEBUG_LOCATION, reason);
239 if (xds_client != nullptr) {
240 return xds_client.TakeAsSubclass<GrpcXdsClient>();
241 }
242 }
243 // Find bootstrap contents.
244 auto bootstrap_contents = GetBootstrapContents(g_fallback_bootstrap_config);
245 if (!bootstrap_contents.ok()) return bootstrap_contents.status();
246 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
247 gpr_log(GPR_INFO, "xDS bootstrap contents: %s",
248 bootstrap_contents->c_str());
249 }
250 // Parse bootstrap.
251 auto bootstrap = GrpcXdsBootstrap::Create(*bootstrap_contents);
252 if (!bootstrap.ok()) return bootstrap.status();
253 // Instantiate XdsClient.
254 auto channel_args = ChannelArgs::FromC(g_channel_args);
255 auto xds_client = MakeRefCounted<GrpcXdsClient>(
256 key, std::move(*bootstrap), channel_args,
257 MakeOrphanable<GrpcXdsTransportFactory>(channel_args));
258 g_xds_client_map->emplace(xds_client->key(), xds_client.get());
259 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
260 gpr_log(GPR_INFO, "[xds_client %p] Created xDS client for key %s",
261 xds_client.get(), std::string(key).c_str());
262 }
263 return xds_client;
264 }
265
266 namespace {
267
GetStatsPluginGroupForKey(absl::string_view key)268 GlobalStatsPluginRegistry::StatsPluginGroup GetStatsPluginGroupForKey(
269 absl::string_view key) {
270 if (key == GrpcXdsClient::kServerKey) {
271 return GlobalStatsPluginRegistry::GetStatsPluginsForServer(ChannelArgs{});
272 }
273 // TODO(roth): How do we set the authority here?
274 experimental::StatsPluginChannelScope scope(key, "");
275 return GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
276 }
277
278 } // namespace
279
GrpcXdsClient(absl::string_view key,std::unique_ptr<GrpcXdsBootstrap> bootstrap,const ChannelArgs & args,OrphanablePtr<XdsTransportFactory> transport_factory)280 GrpcXdsClient::GrpcXdsClient(
281 absl::string_view key, std::unique_ptr<GrpcXdsBootstrap> bootstrap,
282 const ChannelArgs& args,
283 OrphanablePtr<XdsTransportFactory> transport_factory)
284 : XdsClient(
285 std::move(bootstrap), std::move(transport_factory),
286 grpc_event_engine::experimental::GetDefaultEventEngine(),
287 std::make_unique<MetricsReporter>(*this),
288 absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING,
289 GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING),
290 absl::StrCat("C-core ", grpc_version_string(),
291 GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING,
292 GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING),
293 std::max(Duration::Zero(),
294 args.GetDurationFromIntMillis(
295 GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS)
296 .value_or(Duration::Seconds(15)))),
297 key_(key),
298 certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
299 static_cast<const GrpcXdsBootstrap&>(this->bootstrap())
300 .certificate_providers())),
301 stats_plugin_group_(GetStatsPluginGroupForKey(key_)),
302 registered_metric_callback_(stats_plugin_group_.RegisterCallback(
303 [this](CallbackMetricReporter& reporter) {
304 ReportCallbackMetrics(reporter);
305 },
__anon458bde140502(CallbackMetricReporter& reporter) 306 {kMetricConnected, kMetricResources})) {}
307
Orphaned()308 void GrpcXdsClient::Orphaned() {
309 registered_metric_callback_.reset();
310 XdsClient::Orphaned();
311 MutexLock lock(g_mu);
312 auto it = g_xds_client_map->find(key_);
313 if (it != g_xds_client_map->end() && it->second == this) {
314 g_xds_client_map->erase(it);
315 }
316 }
317
interested_parties() const318 grpc_pollset_set* GrpcXdsClient::interested_parties() const {
319 return reinterpret_cast<GrpcXdsTransportFactory*>(transport_factory())
320 ->interested_parties();
321 }
322
323 namespace {
324
GetAllXdsClients()325 std::vector<RefCountedPtr<GrpcXdsClient>> GetAllXdsClients() {
326 MutexLock lock(g_mu);
327 std::vector<RefCountedPtr<GrpcXdsClient>> xds_clients;
328 for (const auto& key_client : *g_xds_client_map) {
329 auto xds_client =
330 key_client.second->RefIfNonZero(DEBUG_LOCATION, "DumpAllClientConfigs");
331 if (xds_client != nullptr) {
332 xds_clients.emplace_back(xds_client.TakeAsSubclass<GrpcXdsClient>());
333 }
334 }
335 return xds_clients;
336 }
337
338 } // namespace
339
340 // ABSL_NO_THREAD_SAFETY_ANALYSIS because we have to manually manage locks for
341 // individual XdsClients and compiler struggles with checking the validity
DumpAllClientConfigs()342 grpc_slice GrpcXdsClient::DumpAllClientConfigs()
343 ABSL_NO_THREAD_SAFETY_ANALYSIS {
344 auto xds_clients = GetAllXdsClients();
345 upb::Arena arena;
346 // Contains strings that should survive till serialization
347 std::set<std::string> string_pool;
348 auto response = envoy_service_status_v3_ClientStatusResponse_new(arena.ptr());
349 // We lock each XdsClient mutex till we are done with the serialization to
350 // ensure that all data referenced from the UPB proto message stays alive.
351 for (const auto& xds_client : xds_clients) {
352 auto client_config =
353 envoy_service_status_v3_ClientStatusResponse_add_config(response,
354 arena.ptr());
355 xds_client->mu()->Lock();
356 xds_client->DumpClientConfig(&string_pool, arena.ptr(), client_config);
357 envoy_service_status_v3_ClientConfig_set_client_scope(
358 client_config, StdStringToUpbString(xds_client->key()));
359 }
360 // Serialize the upb message to bytes
361 size_t output_length;
362 char* output = envoy_service_status_v3_ClientStatusResponse_serialize(
363 response, arena.ptr(), &output_length);
364 for (const auto& xds_client : xds_clients) {
365 xds_client->mu()->Unlock();
366 }
367 return grpc_slice_from_cpp_string(std::string(output, output_length));
368 }
369
ReportCallbackMetrics(CallbackMetricReporter & reporter)370 void GrpcXdsClient::ReportCallbackMetrics(CallbackMetricReporter& reporter) {
371 MutexLock lock(mu());
372 ReportResourceCounts([&](const ResourceCountLabels& labels, uint64_t count) {
373 reporter.Report(
374 kMetricResources, count,
375 {key_, labels.xds_authority, labels.resource_type, labels.cache_state},
376 {});
377 });
378 ReportServerConnections([&](absl::string_view xds_server, bool connected) {
379 reporter.Report(kMetricConnected, connected, {key_, xds_server}, {});
380 });
381 }
382
383 namespace internal {
384
SetXdsChannelArgsForTest(grpc_channel_args * args)385 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
386 MutexLock lock(g_mu);
387 g_channel_args = args;
388 }
389
UnsetGlobalXdsClientsForTest()390 void UnsetGlobalXdsClientsForTest() {
391 MutexLock lock(g_mu);
392 g_xds_client_map->clear();
393 }
394
SetXdsFallbackBootstrapConfig(const char * config)395 void SetXdsFallbackBootstrapConfig(const char* config) {
396 MutexLock lock(g_mu);
397 gpr_free(g_fallback_bootstrap_config);
398 g_fallback_bootstrap_config = gpr_strdup(config);
399 }
400
401 } // namespace internal
402
403 } // namespace grpc_core
404
405 // The returned bytes may contain NULL(0), so we can't use c-string.
grpc_dump_xds_configs(void)406 grpc_slice grpc_dump_xds_configs(void) {
407 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
408 grpc_core::ExecCtx exec_ctx;
409 return grpc_core::GrpcXdsClient::DumpAllClientConfigs();
410 }
411