// // // Copyright 2023 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // #ifndef GRPC_SRC_CPP_EXT_OTEL_OTEL_PLUGIN_H #define GRPC_SRC_CPP_EXT_OTEL_OTEL_PLUGIN_H #include #include #include #include #include #include #include #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "absl/functional/any_invocable.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "opentelemetry/metrics/async_instruments.h" #include "opentelemetry/metrics/meter_provider.h" #include "opentelemetry/metrics/observer_result.h" #include "opentelemetry/metrics/sync_instruments.h" #include "opentelemetry/nostd/shared_ptr.h" #include #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/metrics.h" #include "src/core/lib/transport/metadata_batch.h" namespace grpc { namespace internal { // An iterable container interface that can be used as a return type for the // OpenTelemetry plugin's label injector. class LabelsIterable { public: virtual ~LabelsIterable() = default; // Returns the key-value label at the current position or absl::nullopt if the // iterator has reached the end. virtual absl::optional> Next() = 0; virtual size_t Size() const = 0; // Resets position of iterator to the start. virtual void ResetIteratorPosition() = 0; }; // An interface that allows you to add additional labels on the calls traced // through the OpenTelemetry plugin. class LabelsInjector { public: virtual ~LabelsInjector() {} // Read the incoming initial metadata to get the set of labels to be added to // metrics. virtual std::unique_ptr GetLabels( grpc_metadata_batch* incoming_initial_metadata) const = 0; // Modify the outgoing initial metadata with metadata information to be sent // to the peer. On the server side, \a labels_from_incoming_metadata returned // from `GetLabels` should be provided as input here. On the client side, this // should be nullptr. virtual void AddLabels( grpc_metadata_batch* outgoing_initial_metadata, LabelsIterable* labels_from_incoming_metadata) const = 0; // Adds optional labels to the traced calls. Each entry in the span // corresponds to the CallAttemptTracer::OptionalLabelComponent enum. Returns // false when callback returns false. virtual bool AddOptionalLabels( bool is_client, absl::Span optional_labels, opentelemetry::nostd::function_ref< bool(opentelemetry::nostd::string_view, opentelemetry::common::AttributeValue)> callback) const = 0; // Gets the actual size of the optional labels that the Plugin is going to // produce through the AddOptionalLabels method. virtual size_t GetOptionalLabelsSize( bool is_client, absl::Span optional_labels) const = 0; }; class InternalOpenTelemetryPluginOption : public grpc::OpenTelemetryPluginOption { public: ~InternalOpenTelemetryPluginOption() override = default; // Determines whether a plugin option is active on a given channel target virtual bool IsActiveOnClientChannel(absl::string_view target) const = 0; // Determines whether a plugin option is active on a given server virtual bool IsActiveOnServer(const grpc_core::ChannelArgs& args) const = 0; // Returns the LabelsInjector used by this plugin option, nullptr if none. virtual const grpc::internal::LabelsInjector* labels_injector() const = 0; }; // Tags absl::string_view OpenTelemetryMethodKey(); absl::string_view OpenTelemetryStatusKey(); absl::string_view OpenTelemetryTargetKey(); class OpenTelemetryPluginBuilderImpl { public: OpenTelemetryPluginBuilderImpl(); ~OpenTelemetryPluginBuilderImpl(); // If `SetMeterProvider()` is not called, no metrics are collected. OpenTelemetryPluginBuilderImpl& SetMeterProvider( std::shared_ptr meter_provider); // Methods to manipulate which instruments are enabled in the OpenTelemetry // Stats Plugin. The default set of instruments are - // grpc.client.attempt.started // grpc.client.attempt.duration // grpc.client.attempt.sent_total_compressed_message_size // grpc.client.attempt.rcvd_total_compressed_message_size // grpc.server.call.started // grpc.server.call.duration // grpc.server.call.sent_total_compressed_message_size // grpc.server.call.rcvd_total_compressed_message_size OpenTelemetryPluginBuilderImpl& EnableMetrics( absl::Span metric_names); OpenTelemetryPluginBuilderImpl& DisableMetrics( absl::Span metric_names); OpenTelemetryPluginBuilderImpl& DisableAllMetrics(); // If set, \a server_selector is called per incoming call on the server // to decide whether to collect metrics on that call or not. // TODO(yashkt): We should only need to do this per server connection or even // per server. Change this when we have a ServerTracer. OpenTelemetryPluginBuilderImpl& SetServerSelector( absl::AnyInvocable server_selector); // If set, \a target_attribute_filter is called per channel to decide whether // to record the target attribute on client or to replace it with "other". // This helps reduce the cardinality on metrics in cases where many channels // are created with different targets in the same binary (which might happen // for example, if the channel target string uses IP addresses directly). OpenTelemetryPluginBuilderImpl& SetTargetAttributeFilter( absl::AnyInvocable target_attribute_filter); // If set, \a generic_method_attribute_filter is called per call with a // generic method type to decide whether to record the method name or to // replace it with "other". Non-generic or pre-registered methods remain // unaffected. If not set, by default, generic method names are replaced with // "other" when recording metrics. OpenTelemetryPluginBuilderImpl& SetGenericMethodAttributeFilter( absl::AnyInvocable generic_method_attribute_filter); OpenTelemetryPluginBuilderImpl& AddPluginOption( std::unique_ptr option); // Records \a optional_label_key on all metrics that provide it. OpenTelemetryPluginBuilderImpl& AddOptionalLabel( absl::string_view optional_label_key); // Set scope filter to choose which channels are recorded by this plugin. // Server-side recording remains unaffected. OpenTelemetryPluginBuilderImpl& SetChannelScopeFilter( absl::AnyInvocable< bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const> channel_scope_filter); absl::Status BuildAndRegisterGlobal(); const absl::flat_hash_set& TestOnlyEnabledMetrics() { return metrics_; } private: std::shared_ptr meter_provider_; std::unique_ptr labels_injector_; absl::AnyInvocable target_attribute_filter_; absl::flat_hash_set metrics_; absl::AnyInvocable generic_method_attribute_filter_; absl::AnyInvocable server_selector_; std::vector> plugin_options_; std::set optional_label_keys_; absl::AnyInvocable channel_scope_filter_; }; class OpenTelemetryPlugin : public grpc_core::StatsPlugin { public: OpenTelemetryPlugin( const absl::flat_hash_set& metrics, opentelemetry::nostd::shared_ptr meter_provider, absl::AnyInvocable target_attribute_filter, absl::AnyInvocable generic_method_attribute_filter, absl::AnyInvocable server_selector, std::vector> plugin_options, const std::set& optional_label_keys, absl::AnyInvocable< bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const> channel_scope_filter); private: class ClientCallTracer; class KeyValueIterable; class NPCMetricsKeyValueIterable; class ServerCallTracer; // Creates a convenience wrapper to help iterate over only those plugin // options that are active over a given channel/server. class ActivePluginOptionsView { public: static ActivePluginOptionsView MakeForClient( absl::string_view target, const OpenTelemetryPlugin* otel_plugin) { return ActivePluginOptionsView( [target](const InternalOpenTelemetryPluginOption& plugin_option) { return plugin_option.IsActiveOnClientChannel(target); }, otel_plugin); } static ActivePluginOptionsView MakeForServer( const grpc_core::ChannelArgs& args, const OpenTelemetryPlugin* otel_plugin) { return ActivePluginOptionsView( [&args](const InternalOpenTelemetryPluginOption& plugin_option) { return plugin_option.IsActiveOnServer(args); }, otel_plugin); } bool ForEach(absl::FunctionRef< bool(const InternalOpenTelemetryPluginOption&, size_t)> func, const OpenTelemetryPlugin* otel_plugin) const { for (size_t i = 0; i < otel_plugin->plugin_options().size(); ++i) { const auto& plugin_option = otel_plugin->plugin_options()[i]; if (active_mask_[i] && !func(*plugin_option, i)) { return false; } } return true; } private: explicit ActivePluginOptionsView( absl::FunctionRef func, const OpenTelemetryPlugin* otel_plugin) { for (size_t i = 0; i < otel_plugin->plugin_options().size(); ++i) { const auto& plugin_option = otel_plugin->plugin_options()[i]; if (plugin_option != nullptr && func(*plugin_option)) { active_mask_.set(i); } } } std::bitset<64> active_mask_; }; class ClientScopeConfig : public grpc_core::StatsPlugin::ScopeConfig { public: ClientScopeConfig(const OpenTelemetryPlugin* otel_plugin, const OpenTelemetryPluginBuilder::ChannelScope& scope) : active_plugin_options_view_(ActivePluginOptionsView::MakeForClient( scope.target(), otel_plugin)), filtered_target_( // Use the original target string only if a filter on the // attribute is not registered or if the filter returns true, // otherwise use "other". otel_plugin->target_attribute_filter() == nullptr || otel_plugin->target_attribute_filter()(scope.target()) ? scope.target() : "other") {} const ActivePluginOptionsView& active_plugin_options_view() const { return active_plugin_options_view_; } absl::string_view filtered_target() const { return filtered_target_; } private: ActivePluginOptionsView active_plugin_options_view_; std::string filtered_target_; }; class ServerScopeConfig : public grpc_core::StatsPlugin::ScopeConfig { public: ServerScopeConfig(const OpenTelemetryPlugin* otel_plugin, const grpc_core::ChannelArgs& args) : active_plugin_options_view_( ActivePluginOptionsView::MakeForServer(args, otel_plugin)) {} const ActivePluginOptionsView& active_plugin_options_view() const { return active_plugin_options_view_; } private: ActivePluginOptionsView active_plugin_options_view_; }; struct ClientMetrics { struct Attempt { std::unique_ptr> started; std::unique_ptr> duration; std::unique_ptr> sent_total_compressed_message_size; std::unique_ptr> rcvd_total_compressed_message_size; } attempt; }; struct ServerMetrics { struct Call { std::unique_ptr> started; std::unique_ptr> duration; std::unique_ptr> sent_total_compressed_message_size; std::unique_ptr> rcvd_total_compressed_message_size; } call; }; // This object should be used inline. class CallbackMetricReporter : public grpc_core::CallbackMetricReporter { public: CallbackMetricReporter(OpenTelemetryPlugin* ot_plugin, grpc_core::RegisteredMetricCallback* key) ABSL_EXCLUSIVE_LOCKS_REQUIRED(ot_plugin->mu_); void Report( grpc_core::GlobalInstrumentsRegistry::GlobalCallbackInt64GaugeHandle handle, int64_t value, absl::Span label_values, absl::Span optional_values) ABSL_EXCLUSIVE_LOCKS_REQUIRED( CallbackGaugeState::ot_plugin->mu_) override; void Report( grpc_core::GlobalInstrumentsRegistry::GlobalCallbackDoubleGaugeHandle handle, double value, absl::Span label_values, absl::Span optional_values) ABSL_EXCLUSIVE_LOCKS_REQUIRED( CallbackGaugeState::ot_plugin->mu_) override; private: OpenTelemetryPlugin* ot_plugin_; grpc_core::RegisteredMetricCallback* key_; }; // Returns the string form of \a key static absl::string_view OptionalLabelKeyToString( grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey key); // Returns the OptionalLabelKey form of \a key if \a key is recognized and // is public, absl::nullopt otherwise. static absl::optional< grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey> OptionalLabelStringToKey(absl::string_view key); // StatsPlugin: std::pair> IsEnabledForChannel( const OpenTelemetryPluginBuilder::ChannelScope& scope) const override; std::pair> IsEnabledForServer(const grpc_core::ChannelArgs& args) const override; void AddCounter( grpc_core::GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle, uint64_t value, absl::Span label_values, absl::Span optional_values) override; void AddCounter( grpc_core::GlobalInstrumentsRegistry::GlobalDoubleCounterHandle handle, double value, absl::Span label_values, absl::Span optional_values) override; void RecordHistogram( grpc_core::GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle handle, uint64_t value, absl::Span label_values, absl::Span optional_values) override; void RecordHistogram( grpc_core::GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle handle, double value, absl::Span label_values, absl::Span optional_values) override; void SetGauge( grpc_core::GlobalInstrumentsRegistry::GlobalInt64GaugeHandle /*handle*/, int64_t /*value*/, absl::Span /*label_values*/, absl::Span /*optional_values*/) override {} void SetGauge( grpc_core::GlobalInstrumentsRegistry::GlobalDoubleGaugeHandle /*handle*/, double /*value*/, absl::Span /*label_values*/, absl::Span /*optional_values*/) override {} void AddCallback(grpc_core::RegisteredMetricCallback* callback) ABSL_LOCKS_EXCLUDED(mu_) override; void RemoveCallback(grpc_core::RegisteredMetricCallback* callback) ABSL_LOCKS_EXCLUDED(mu_) override; grpc_core::ClientCallTracer* GetClientCallTracer( const grpc_core::Slice& path, bool registered_method, std::shared_ptr scope_config) override; grpc_core::ServerCallTracer* GetServerCallTracer( std::shared_ptr scope_config) override; const absl::AnyInvocable& server_selector() const { return server_selector_; } const absl::AnyInvocable& target_attribute_filter() const { return target_attribute_filter_; } const absl::AnyInvocable& generic_method_attribute_filter() const { return generic_method_attribute_filter_; } const std::vector>& plugin_options() const { return plugin_options_; } template struct CallbackGaugeState { // It's possible to set values for multiple sets of labels at the same time // in a single callback. Key is a vector of label values and enabled // optional label values. using Cache = absl::flat_hash_map, ValueType>; grpc_core::GlobalInstrumentsRegistry::InstrumentID id; opentelemetry::nostd::shared_ptr< opentelemetry::metrics::ObservableInstrument> instrument; bool ot_callback_registered ABSL_GUARDED_BY(ot_plugin->mu_); // instrument1 ----- RegisteredMetricCallback1 // x // instrument2 ----- RegisteredMetricCallback2 // One instrument can be registered by multiple callbacks. absl::flat_hash_map caches ABSL_GUARDED_BY(ot_plugin->mu_); OpenTelemetryPlugin* ot_plugin; static void CallbackGaugeCallback( opentelemetry::metrics::ObserverResult result, void* arg) ABSL_LOCKS_EXCLUDED(ot_plugin->mu_); void Observe(opentelemetry::metrics::ObserverResult& result, const Cache& cache) ABSL_EXCLUSIVE_LOCKS_REQUIRED(ot_plugin->mu_); }; // Instruments for per-call metrics. ClientMetrics client_; ServerMetrics server_; static constexpr int kOptionalLabelsSizeLimit = 64; using OptionalLabelsBitSet = std::bitset; OptionalLabelsBitSet per_call_optional_label_bits_; // Instruments for non-per-call metrics. struct Disabled {}; using Instrument = absl::variant< Disabled, std::unique_ptr>, std::unique_ptr>, std::unique_ptr>, std::unique_ptr>, std::unique_ptr>, std::unique_ptr>>; struct InstrumentData { Instrument instrument; OptionalLabelsBitSet optional_labels_bits; }; std::vector instruments_data_; grpc_core::Mutex mu_; absl::flat_hash_map callback_timestamps_ ABSL_GUARDED_BY(mu_); opentelemetry::nostd::shared_ptr meter_provider_; absl::AnyInvocable server_selector_; absl::AnyInvocable target_attribute_filter_; absl::AnyInvocable generic_method_attribute_filter_; std::vector> plugin_options_; absl::AnyInvocable channel_scope_filter_; }; } // namespace internal } // namespace grpc #endif // GRPC_SRC_CPP_EXT_OTEL_OTEL_PLUGIN_H