xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/xds/xds_client_grpc.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/xds/xds_client_grpc.h"
20 
21 #include <algorithm>
22 #include <cstddef>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/base/thread_annotations.h"
29 #include "absl/status/status.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/types/optional.h"
33 #include "envoy/service/status/v3/csds.upb.h"
34 #include "upb/base/string_view.h"
35 
36 #include <grpc/grpc.h>
37 #include <grpc/impl/channel_arg_names.h>
38 #include <grpc/slice.h>
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41 #include <grpc/support/string_util.h>
42 
43 #include "src/core/ext/xds/upb_utils.h"
44 #include "src/core/ext/xds/xds_api.h"
45 #include "src/core/ext/xds/xds_bootstrap.h"
46 #include "src/core/ext/xds/xds_bootstrap_grpc.h"
47 #include "src/core/ext/xds/xds_channel_args.h"
48 #include "src/core/ext/xds/xds_client.h"
49 #include "src/core/ext/xds/xds_transport.h"
50 #include "src/core/ext/xds/xds_transport_grpc.h"
51 #include "src/core/lib/channel/channel_args.h"
52 #include "src/core/lib/channel/metrics.h"
53 #include "src/core/lib/debug/trace.h"
54 #include "src/core/lib/event_engine/default_event_engine.h"
55 #include "src/core/lib/gprpp/debug_location.h"
56 #include "src/core/lib/gprpp/env.h"
57 #include "src/core/lib/gprpp/load_file.h"
58 #include "src/core/lib/gprpp/orphanable.h"
59 #include "src/core/lib/gprpp/ref_counted_ptr.h"
60 #include "src/core/lib/gprpp/sync.h"
61 #include "src/core/lib/gprpp/time.h"
62 #include "src/core/lib/iomgr/error.h"
63 #include "src/core/lib/iomgr/exec_ctx.h"
64 #include "src/core/lib/slice/slice.h"
65 #include "src/core/lib/slice/slice_internal.h"
66 #include "src/core/lib/transport/error_utils.h"
67 
68 // If gRPC is built with -DGRPC_XDS_USER_AGENT_NAME_SUFFIX="...", that string
69 // will be appended to the user agent name reported to the xDS server.
70 #ifdef GRPC_XDS_USER_AGENT_NAME_SUFFIX
71 #define GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING \
72   " " GRPC_XDS_USER_AGENT_NAME_SUFFIX
73 #else
74 #define GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING ""
75 #endif
76 
77 // If gRPC is built with -DGRPC_XDS_USER_AGENT_VERSION_SUFFIX="...", that string
78 // will be appended to the user agent version reported to the xDS server.
79 #ifdef GRPC_XDS_USER_AGENT_VERSION_SUFFIX
80 #define GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING \
81   " " GRPC_XDS_USER_AGENT_VERSION_SUFFIX
82 #else
83 #define GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING ""
84 #endif
85 
86 namespace grpc_core {
87 
88 namespace {
89 
90 // Metric labels.
91 constexpr absl::string_view kMetricLabelXdsServer = "grpc.xds.server";
92 constexpr absl::string_view kMetricLabelXdsAuthority = "grpc.xds.authority";
93 constexpr absl::string_view kMetricLabelXdsResourceType =
94     "grpc.xds.resource_type";
95 constexpr absl::string_view kMetricLabelXdsCacheState = "grpc.xds.cache_state";
96 
97 const auto kMetricResourceUpdatesValid =
98     GlobalInstrumentsRegistry::RegisterUInt64Counter(
99         "grpc.xds_client.resource_updates_valid",
100         "EXPERIMENTAL.  A counter of resources received that were considered "
101         "valid.  The counter will be incremented even for resources that "
102         "have not changed.",
103         "{resource}",
104         {kMetricLabelTarget, kMetricLabelXdsServer,
105          kMetricLabelXdsResourceType},
106         {}, false);
107 
108 const auto kMetricResourceUpdatesInvalid =
109     GlobalInstrumentsRegistry::RegisterUInt64Counter(
110         "grpc.xds_client.resource_updates_invalid",
111         "EXPERIMENTAL.  A counter of resources received that were considered "
112         "invalid.",
113         "{resource}",
114         {kMetricLabelTarget, kMetricLabelXdsServer,
115          kMetricLabelXdsResourceType},
116         {}, false);
117 
118 const auto kMetricConnected =
119     GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
120         "grpc.xds_client.connected",
121         "EXPERIMENTAL.  Whether or not the xDS client currently has a "
122         "working ADS stream to the xDS server. For a given server, this "
123         "will be set to 0 when we have a connectivity failure or when the "
124         "ADS stream fails without seeing a response message, as per gRFC "
125         "A57.  It will be set to 1 when we receive the first response on "
126         "an ADS stream.",
127         "{bool}", {kMetricLabelTarget, kMetricLabelXdsServer}, {}, false);
128 
129 const auto kMetricResources =
130     GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
131         "grpc.xds_client.resources", "EXPERIMENTAL.  Number of xDS resources.",
132         "{resource}",
133         {kMetricLabelTarget, kMetricLabelXdsAuthority,
134          kMetricLabelXdsResourceType, kMetricLabelXdsCacheState},
135         {}, false);
136 
137 }  // namespace
138 
139 //
140 // GrpcXdsClient::MetricsReporter
141 //
142 
143 class GrpcXdsClient::MetricsReporter final : public XdsMetricsReporter {
144  public:
MetricsReporter(GrpcXdsClient & xds_client)145   explicit MetricsReporter(GrpcXdsClient& xds_client)
146       : xds_client_(xds_client) {}
147 
ReportResourceUpdates(absl::string_view xds_server,absl::string_view resource_type,uint64_t num_valid_resources,uint64_t num_invalid_resources)148   void ReportResourceUpdates(absl::string_view xds_server,
149                              absl::string_view resource_type,
150                              uint64_t num_valid_resources,
151                              uint64_t num_invalid_resources) override {
152     xds_client_.stats_plugin_group_.AddCounter(
153         kMetricResourceUpdatesValid, num_valid_resources,
154         {xds_client_.key_, xds_server, resource_type}, {});
155     xds_client_.stats_plugin_group_.AddCounter(
156         kMetricResourceUpdatesInvalid, num_invalid_resources,
157         {xds_client_.key_, xds_server, resource_type}, {});
158   }
159 
160  private:
161   GrpcXdsClient& xds_client_;
162 };
163 
164 //
165 // GrpcXdsClient
166 //
167 
168 constexpr absl::string_view GrpcXdsClient::kServerKey;
169 
170 namespace {
171 
172 Mutex* g_mu = new Mutex;
173 const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
174 // Key bytes live in clients so they outlive the entries in this map
175 NoDestruct<std::map<absl::string_view, GrpcXdsClient*>> g_xds_client_map
176     ABSL_GUARDED_BY(*g_mu);
177 char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
178 
GetBootstrapContents(const char * fallback_config)179 absl::StatusOr<std::string> GetBootstrapContents(const char* fallback_config) {
180   // First, try GRPC_XDS_BOOTSTRAP env var.
181   auto path = GetEnv("GRPC_XDS_BOOTSTRAP");
182   if (path.has_value()) {
183     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
184       gpr_log(GPR_INFO,
185               "Got bootstrap file location from GRPC_XDS_BOOTSTRAP "
186               "environment variable: %s",
187               path->c_str());
188     }
189     auto contents = LoadFile(*path, /*add_null_terminator=*/true);
190     if (!contents.ok()) return contents.status();
191     return std::string(contents->as_string_view());
192   }
193   // Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
194   auto env_config = GetEnv("GRPC_XDS_BOOTSTRAP_CONFIG");
195   if (env_config.has_value()) {
196     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
197       gpr_log(GPR_INFO,
198               "Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG "
199               "environment variable");
200     }
201     return std::move(*env_config);
202   }
203   // Finally, try fallback config.
204   if (fallback_config != nullptr) {
205     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
206       gpr_log(GPR_INFO, "Got bootstrap contents from fallback config");
207     }
208     return fallback_config;
209   }
210   // No bootstrap config found.
211   return absl::FailedPreconditionError(
212       "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
213       "not defined");
214 }
215 
216 }  // namespace
217 
GetOrCreate(absl::string_view key,const ChannelArgs & args,const char * reason)218 absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GrpcXdsClient::GetOrCreate(
219     absl::string_view key, const ChannelArgs& args, const char* reason) {
220   // If getting bootstrap from channel args, create a local XdsClient
221   // instance for the channel or server instead of using the global instance.
222   absl::optional<absl::string_view> bootstrap_config = args.GetString(
223       GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG);
224   if (bootstrap_config.has_value()) {
225     auto bootstrap = GrpcXdsBootstrap::Create(*bootstrap_config);
226     if (!bootstrap.ok()) return bootstrap.status();
227     grpc_channel_args* xds_channel_args = args.GetPointer<grpc_channel_args>(
228         GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
229     auto channel_args = ChannelArgs::FromC(xds_channel_args);
230     return MakeRefCounted<GrpcXdsClient>(
231         key, std::move(*bootstrap), channel_args,
232         MakeOrphanable<GrpcXdsTransportFactory>(channel_args));
233   }
234   // Otherwise, use the global instance.
235   MutexLock lock(g_mu);
236   auto it = g_xds_client_map->find(key);
237   if (it != g_xds_client_map->end()) {
238     auto xds_client = it->second->RefIfNonZero(DEBUG_LOCATION, reason);
239     if (xds_client != nullptr) {
240       return xds_client.TakeAsSubclass<GrpcXdsClient>();
241     }
242   }
243   // Find bootstrap contents.
244   auto bootstrap_contents = GetBootstrapContents(g_fallback_bootstrap_config);
245   if (!bootstrap_contents.ok()) return bootstrap_contents.status();
246   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
247     gpr_log(GPR_INFO, "xDS bootstrap contents: %s",
248             bootstrap_contents->c_str());
249   }
250   // Parse bootstrap.
251   auto bootstrap = GrpcXdsBootstrap::Create(*bootstrap_contents);
252   if (!bootstrap.ok()) return bootstrap.status();
253   // Instantiate XdsClient.
254   auto channel_args = ChannelArgs::FromC(g_channel_args);
255   auto xds_client = MakeRefCounted<GrpcXdsClient>(
256       key, std::move(*bootstrap), channel_args,
257       MakeOrphanable<GrpcXdsTransportFactory>(channel_args));
258   g_xds_client_map->emplace(xds_client->key(), xds_client.get());
259   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
260     gpr_log(GPR_INFO, "[xds_client %p] Created xDS client for key %s",
261             xds_client.get(), std::string(key).c_str());
262   }
263   return xds_client;
264 }
265 
266 namespace {
267 
GetStatsPluginGroupForKey(absl::string_view key)268 GlobalStatsPluginRegistry::StatsPluginGroup GetStatsPluginGroupForKey(
269     absl::string_view key) {
270   if (key == GrpcXdsClient::kServerKey) {
271     return GlobalStatsPluginRegistry::GetStatsPluginsForServer(ChannelArgs{});
272   }
273   // TODO(roth): How do we set the authority here?
274   experimental::StatsPluginChannelScope scope(key, "");
275   return GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
276 }
277 
278 }  // namespace
279 
GrpcXdsClient(absl::string_view key,std::unique_ptr<GrpcXdsBootstrap> bootstrap,const ChannelArgs & args,OrphanablePtr<XdsTransportFactory> transport_factory)280 GrpcXdsClient::GrpcXdsClient(
281     absl::string_view key, std::unique_ptr<GrpcXdsBootstrap> bootstrap,
282     const ChannelArgs& args,
283     OrphanablePtr<XdsTransportFactory> transport_factory)
284     : XdsClient(
285           std::move(bootstrap), std::move(transport_factory),
286           grpc_event_engine::experimental::GetDefaultEventEngine(),
287           std::make_unique<MetricsReporter>(*this),
288           absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING,
289                        GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING),
290           absl::StrCat("C-core ", grpc_version_string(),
291                        GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING,
292                        GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING),
293           std::max(Duration::Zero(),
294                    args.GetDurationFromIntMillis(
295                            GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS)
296                        .value_or(Duration::Seconds(15)))),
297       key_(key),
298       certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
299           static_cast<const GrpcXdsBootstrap&>(this->bootstrap())
300               .certificate_providers())),
301       stats_plugin_group_(GetStatsPluginGroupForKey(key_)),
302       registered_metric_callback_(stats_plugin_group_.RegisterCallback(
303           [this](CallbackMetricReporter& reporter) {
304             ReportCallbackMetrics(reporter);
305           },
__anon458bde140502(CallbackMetricReporter& reporter) 306           {kMetricConnected, kMetricResources})) {}
307 
Orphaned()308 void GrpcXdsClient::Orphaned() {
309   registered_metric_callback_.reset();
310   XdsClient::Orphaned();
311   MutexLock lock(g_mu);
312   auto it = g_xds_client_map->find(key_);
313   if (it != g_xds_client_map->end() && it->second == this) {
314     g_xds_client_map->erase(it);
315   }
316 }
317 
interested_parties() const318 grpc_pollset_set* GrpcXdsClient::interested_parties() const {
319   return reinterpret_cast<GrpcXdsTransportFactory*>(transport_factory())
320       ->interested_parties();
321 }
322 
323 namespace {
324 
GetAllXdsClients()325 std::vector<RefCountedPtr<GrpcXdsClient>> GetAllXdsClients() {
326   MutexLock lock(g_mu);
327   std::vector<RefCountedPtr<GrpcXdsClient>> xds_clients;
328   for (const auto& key_client : *g_xds_client_map) {
329     auto xds_client =
330         key_client.second->RefIfNonZero(DEBUG_LOCATION, "DumpAllClientConfigs");
331     if (xds_client != nullptr) {
332       xds_clients.emplace_back(xds_client.TakeAsSubclass<GrpcXdsClient>());
333     }
334   }
335   return xds_clients;
336 }
337 
338 }  // namespace
339 
340 // ABSL_NO_THREAD_SAFETY_ANALYSIS because we have to manually manage locks for
341 // individual XdsClients and compiler struggles with checking the validity
DumpAllClientConfigs()342 grpc_slice GrpcXdsClient::DumpAllClientConfigs()
343     ABSL_NO_THREAD_SAFETY_ANALYSIS {
344   auto xds_clients = GetAllXdsClients();
345   upb::Arena arena;
346   // Contains strings that should survive till serialization
347   std::set<std::string> string_pool;
348   auto response = envoy_service_status_v3_ClientStatusResponse_new(arena.ptr());
349   // We lock each XdsClient mutex till we are done with the serialization to
350   // ensure that all data referenced from the UPB proto message stays alive.
351   for (const auto& xds_client : xds_clients) {
352     auto client_config =
353         envoy_service_status_v3_ClientStatusResponse_add_config(response,
354                                                                 arena.ptr());
355     xds_client->mu()->Lock();
356     xds_client->DumpClientConfig(&string_pool, arena.ptr(), client_config);
357     envoy_service_status_v3_ClientConfig_set_client_scope(
358         client_config, StdStringToUpbString(xds_client->key()));
359   }
360   // Serialize the upb message to bytes
361   size_t output_length;
362   char* output = envoy_service_status_v3_ClientStatusResponse_serialize(
363       response, arena.ptr(), &output_length);
364   for (const auto& xds_client : xds_clients) {
365     xds_client->mu()->Unlock();
366   }
367   return grpc_slice_from_cpp_string(std::string(output, output_length));
368 }
369 
ReportCallbackMetrics(CallbackMetricReporter & reporter)370 void GrpcXdsClient::ReportCallbackMetrics(CallbackMetricReporter& reporter) {
371   MutexLock lock(mu());
372   ReportResourceCounts([&](const ResourceCountLabels& labels, uint64_t count) {
373     reporter.Report(
374         kMetricResources, count,
375         {key_, labels.xds_authority, labels.resource_type, labels.cache_state},
376         {});
377   });
378   ReportServerConnections([&](absl::string_view xds_server, bool connected) {
379     reporter.Report(kMetricConnected, connected, {key_, xds_server}, {});
380   });
381 }
382 
383 namespace internal {
384 
SetXdsChannelArgsForTest(grpc_channel_args * args)385 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
386   MutexLock lock(g_mu);
387   g_channel_args = args;
388 }
389 
UnsetGlobalXdsClientsForTest()390 void UnsetGlobalXdsClientsForTest() {
391   MutexLock lock(g_mu);
392   g_xds_client_map->clear();
393 }
394 
SetXdsFallbackBootstrapConfig(const char * config)395 void SetXdsFallbackBootstrapConfig(const char* config) {
396   MutexLock lock(g_mu);
397   gpr_free(g_fallback_bootstrap_config);
398   g_fallback_bootstrap_config = gpr_strdup(config);
399 }
400 
401 }  // namespace internal
402 
403 }  // namespace grpc_core
404 
405 // The returned bytes may contain NULL(0), so we can't use c-string.
grpc_dump_xds_configs(void)406 grpc_slice grpc_dump_xds_configs(void) {
407   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
408   grpc_core::ExecCtx exec_ctx;
409   return grpc_core::GrpcXdsClient::DumpAllClientConfigs();
410 }
411