xref: /aosp_15_r20/external/grpc-grpc/src/cpp/ext/gcp/observability.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <stdint.h>
20 
21 #include <algorithm>
22 #include <map>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/status/status.h"
29 #include "absl/status/statusor.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/time/clock.h"
32 #include "absl/time/time.h"
33 #include "absl/types/optional.h"
34 #include "google/api/monitored_resource.pb.h"
35 #include "google/devtools/cloudtrace/v2/tracing.grpc.pb.h"
36 #include "google/monitoring/v3/metric_service.grpc.pb.h"
37 #include "opencensus/exporters/stats/stackdriver/stackdriver_exporter.h"
38 #include "opencensus/exporters/trace/stackdriver/stackdriver_exporter.h"
39 #include "opencensus/stats/stats.h"
40 #include "opencensus/trace/sampler.h"
41 #include "opencensus/trace/trace_config.h"
42 
43 #include <grpc/grpc.h>
44 #include <grpcpp/channel.h>
45 #include <grpcpp/ext/gcp_observability.h>
46 #include <grpcpp/opencensus.h>
47 #include <grpcpp/security/credentials.h>
48 #include <grpcpp/support/channel_arguments.h>
49 
50 #include "src/core/ext/filters/logging/logging_filter.h"
51 #include "src/core/lib/gprpp/crash.h"
52 #include "src/core/lib/gprpp/notification.h"
53 #include "src/cpp/client/client_stats_interceptor.h"
54 #include "src/cpp/ext/filters/census/client_filter.h"
55 #include "src/cpp/ext/filters/census/grpc_plugin.h"
56 #include "src/cpp/ext/filters/census/open_census_call_tracer.h"
57 #include "src/cpp/ext/gcp/environment_autodetect.h"
58 #include "src/cpp/ext/gcp/observability_config.h"
59 #include "src/cpp/ext/gcp/observability_logging_sink.h"
60 
61 namespace grpc {
62 
63 namespace internal {
64 namespace {
65 
66 grpc::internal::ObservabilityLoggingSink* g_logging_sink = nullptr;
67 
68 bool g_gcp_observability_initialized = false;
69 
70 // TODO(yashykt): These constants are currently derived from the example at
71 // https://cloud.google.com/traffic-director/docs/observability-proxyless#c++.
72 // We might want these to be configurable.
73 constexpr uint32_t kMaxAttributes = 128;
74 constexpr uint32_t kMaxAnnotations = 128;
75 constexpr uint32_t kMaxMessageEvents = 128;
76 constexpr uint32_t kMaxLinks = 128;
77 
78 constexpr char kGoogleStackdriverTraceAddress[] = "cloudtrace.googleapis.com";
79 constexpr char kGoogleStackdriverStatsAddress[] = "monitoring.googleapis.com";
80 
RegisterOpenCensusViewsForGcpObservability()81 void RegisterOpenCensusViewsForGcpObservability() {
82   // Register client default views for GCP observability
83   experimental::ClientStartedRpcs().RegisterForExport();
84   experimental::ClientCompletedRpcs().RegisterForExport();
85   experimental::ClientRoundtripLatency().RegisterForExport();
86   internal::ClientApiLatency().RegisterForExport();
87   experimental::ClientSentCompressedMessageBytesPerRpc().RegisterForExport();
88   experimental::ClientReceivedCompressedMessageBytesPerRpc()
89       .RegisterForExport();
90   // Register server default views for GCP observability
91   experimental::ServerStartedRpcs().RegisterForExport();
92   experimental::ServerCompletedRpcs().RegisterForExport();
93   experimental::ServerSentCompressedMessageBytesPerRpc().RegisterForExport();
94   experimental::ServerReceivedCompressedMessageBytesPerRpc()
95       .RegisterForExport();
96   experimental::ServerServerLatency().RegisterForExport();
97 }
98 
99 }  // namespace
100 
GcpObservabilityInit()101 absl::Status GcpObservabilityInit() {
102   auto config = grpc::internal::GcpObservabilityConfig::ReadFromEnv();
103   if (!config.ok()) {
104     return config.status();
105   }
106   if (!config->cloud_trace.has_value() &&
107       !config->cloud_monitoring.has_value() &&
108       !config->cloud_logging.has_value()) {
109     return absl::OkStatus();
110   }
111   if (g_gcp_observability_initialized) {
112     grpc_core::Crash("GCP Observability for gRPC was already initialized.");
113   }
114   g_gcp_observability_initialized = true;
115   grpc::internal::EnvironmentAutoDetect::Create(config->project_id);
116   if (!config->cloud_trace.has_value()) {
117     // Disable OpenCensus tracing
118     grpc::internal::EnableOpenCensusTracing(false);
119   }
120   if (!config->cloud_monitoring.has_value()) {
121     // Disable OpenCensus stats
122     grpc::internal::EnableOpenCensusStats(false);
123   } else {
124     // Register the OpenCensus client stats interceptor factory if stats are
125     // enabled. Note that this is currently separate from the OpenCensus Plugin
126     // to avoid changing the behavior of the currently available OpenCensus
127     // plugin.
128     grpc::internal::RegisterGlobalClientStatsInterceptorFactory(
129         new grpc::internal::OpenCensusClientInterceptorFactory);
130   }
131   if (config->cloud_logging.has_value()) {
132     g_logging_sink = new grpc::internal::ObservabilityLoggingSink(
133         config->cloud_logging.value(), config->project_id, config->labels);
134     grpc_core::RegisterLoggingFilter(g_logging_sink);
135   }
136   // If tracing or monitoring is enabled, we need to register the OpenCensus
137   // plugin as well.
138   if (config->cloud_trace.has_value() || config->cloud_monitoring.has_value()) {
139     grpc::RegisterOpenCensusPlugin();
140   }
141   // If tracing or monitoring is enabled, we need to detect the environment for
142   // OpenCensus, set the labels and attributes and prepare the StackDriver
143   // exporter.
144   // Note that this should be the last step of GcpObservabilityInit() since we
145   // can't register any more filters after grpc_init.
146   if (config->cloud_trace.has_value() || config->cloud_monitoring.has_value()) {
147     grpc_init();
148     grpc_core::Notification notification;
149     grpc::internal::EnvironmentAutoDetect::Get().NotifyOnDone(
150         [&]() { notification.Notify(); });
151     notification.WaitForNotification();
152     auto* resource = grpc::internal::EnvironmentAutoDetect::Get().resource();
153     if (config->cloud_trace.has_value()) {
154       // Set up attributes for constant tracing
155       std::vector<internal::OpenCensusRegistry::Attribute> attributes;
156       attributes.reserve(resource->labels.size() + config->labels.size());
157       // First insert in environment labels
158       for (const auto& resource_label : resource->labels) {
159         attributes.push_back(internal::OpenCensusRegistry::Attribute{
160             absl::StrCat(resource->resource_type, ".", resource_label.first),
161             resource_label.second});
162       }
163       // Then insert in labels from the GCP Observability config.
164       for (const auto& constant_label : config->labels) {
165         attributes.push_back(internal::OpenCensusRegistry::Attribute{
166             constant_label.first, constant_label.second});
167       }
168       grpc::internal::OpenCensusRegistry::Get().RegisterConstantAttributes(
169           std::move(attributes));
170     }
171     if (config->cloud_monitoring.has_value()) {
172       grpc::internal::OpenCensusRegistry::Get().RegisterConstantLabels(
173           config->labels);
174       RegisterOpenCensusViewsForGcpObservability();
175     }
176     // Note that we are setting up the exporters after registering the
177     // attributes and labels to avoid a case where the exporters start an RPC
178     // before we are ready.
179     if (config->cloud_trace.has_value()) {
180       // Set up the StackDriver Exporter for tracing.
181       opencensus::trace::TraceConfig::SetCurrentTraceParams(
182           {kMaxAttributes, kMaxAnnotations, kMaxMessageEvents, kMaxLinks,
183            opencensus::trace::ProbabilitySampler(
184                config->cloud_trace->sampling_rate)});
185       opencensus::exporters::trace::StackdriverOptions trace_opts;
186       trace_opts.project_id = config->project_id;
187       ChannelArguments args;
188       args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
189       trace_opts.trace_service_stub =
190           ::google::devtools::cloudtrace::v2::TraceService::NewStub(
191               CreateCustomChannel(kGoogleStackdriverTraceAddress,
192                                   GoogleDefaultCredentials(), args));
193       opencensus::exporters::trace::StackdriverExporter::Register(
194           std::move(trace_opts));
195     }
196     if (config->cloud_monitoring.has_value()) {
197       // Set up the StackDriver Exporter for monitoring.
198       opencensus::exporters::stats::StackdriverOptions stats_opts;
199       stats_opts.project_id = config->project_id;
200       stats_opts.monitored_resource.set_type(resource->resource_type);
201       stats_opts.monitored_resource.mutable_labels()->insert(
202           resource->labels.begin(), resource->labels.end());
203       ChannelArguments args;
204       args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
205       stats_opts.metric_service_stub =
206           google::monitoring::v3::MetricService::NewStub(
207               CreateCustomChannel(kGoogleStackdriverStatsAddress,
208                                   GoogleDefaultCredentials(), args));
209       opencensus::exporters::stats::StackdriverExporter::Register(
210           std::move(stats_opts));
211     }
212     grpc_shutdown();
213   }
214   return absl::OkStatus();
215 }
216 
GcpObservabilityClose()217 void GcpObservabilityClose() {
218   if (g_logging_sink != nullptr) {
219     g_logging_sink->FlushAndClose();
220   }
221   // Currently, GcpObservabilityClose() only supports flushing logs. Stats and
222   // tracing get automatically flushed at a regular interval, so sleep for an
223   // interval to make sure that those are flushed too.
224   absl::SleepFor(absl::Seconds(25));
225 }
226 
227 }  // namespace internal
228 
229 namespace experimental {
230 
GcpObservabilityInit()231 absl::Status GcpObservabilityInit() {
232   return grpc::internal::GcpObservabilityInit();
233 }
234 
GcpObservabilityClose()235 void GcpObservabilityClose() { return grpc::internal::GcpObservabilityClose(); }
236 
237 }  // namespace experimental
238 
239 //
240 // GcpObservability
241 //
242 
Init()243 absl::StatusOr<GcpObservability> GcpObservability::Init() {
244   absl::Status status = grpc::internal::GcpObservabilityInit();
245   if (!status.ok()) {
246     return status;
247   }
248   GcpObservability obj;
249   obj.impl_ = std::make_unique<GcpObservabilityImpl>();
250   return obj;
251 }
252 
GcpObservability(GcpObservability && other)253 GcpObservability::GcpObservability(GcpObservability&& other) noexcept
254     : impl_(std::move(other.impl_)) {}
255 
operator =(GcpObservability && other)256 GcpObservability& GcpObservability::operator=(
257     GcpObservability&& other) noexcept {
258   if (this != &other) {
259     impl_ = std::move(other.impl_);
260   }
261   return *this;
262 }
263 
264 //
265 // GcpObservability::GcpObservabilityImpl
266 //
267 
~GcpObservabilityImpl()268 GcpObservability::GcpObservabilityImpl::~GcpObservabilityImpl() {
269   grpc::internal::GcpObservabilityClose();
270 }
271 
272 }  // namespace grpc
273