1 // 2 // 3 // Copyright 2023 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_TEST_CPP_EXT_OTEL_OTEL_TEST_LIBRARY_H 20 #define GRPC_TEST_CPP_EXT_OTEL_OTEL_TEST_LIBRARY_H 21 22 #include <atomic> 23 #include <thread> 24 25 #include "absl/functional/any_invocable.h" 26 #include "gmock/gmock.h" 27 #include "gtest/gtest.h" 28 #include "opentelemetry/metrics/provider.h" 29 #include "opentelemetry/sdk/metrics/meter_provider.h" 30 #include "opentelemetry/sdk/metrics/metric_reader.h" 31 32 #include <grpc/support/port_platform.h> 33 #include <grpcpp/generic/generic_stub.h> 34 #include <grpcpp/grpcpp.h> 35 36 #include "src/core/lib/channel/call_tracer.h" 37 #include "src/core/lib/config/core_configuration.h" 38 #include "src/cpp/ext/otel/otel_plugin.h" 39 #include "test/core/util/test_config.h" 40 #include "test/cpp/end2end/test_service_impl.h" 41 42 namespace grpc { 43 namespace testing { 44 45 class MockMetricReader : public opentelemetry::sdk::metrics::MetricReader { 46 public: GetAggregationTemporality(opentelemetry::sdk::metrics::InstrumentType)47 opentelemetry::sdk::metrics::AggregationTemporality GetAggregationTemporality( 48 opentelemetry::sdk::metrics::InstrumentType) const noexcept override { 49 return opentelemetry::sdk::metrics::AggregationTemporality::kDelta; 50 } 51 OnForceFlush(std::chrono::microseconds)52 bool OnForceFlush(std::chrono::microseconds) noexcept override { 53 return true; 54 } 55 OnShutDown(std::chrono::microseconds)56 bool OnShutDown(std::chrono::microseconds) noexcept override { return true; } 57 OnInitialized()58 void OnInitialized() noexcept override {} 59 }; 60 61 class OpenTelemetryPluginEnd2EndTest : public ::testing::Test { 62 protected: 63 struct Options { 64 public: set_metric_namesOptions65 Options& set_metric_names(std::vector<absl::string_view> names) { 66 metric_names = std::move(names); 67 return *this; 68 } 69 set_resourceOptions70 Options& set_resource(const opentelemetry::sdk::resource::Resource& res) { 71 resource = std::make_unique<opentelemetry::sdk::resource::Resource>(res); 72 return *this; 73 } 74 set_use_meter_providerOptions75 Options& set_use_meter_provider(bool flag) { 76 use_meter_provider = flag; 77 return *this; 78 } 79 set_labels_to_injectOptions80 Options& set_labels_to_inject( 81 std::map< 82 grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey, 83 grpc_core::RefCountedStringValue> 84 labels) { 85 labels_to_inject = std::move(labels); 86 return *this; 87 } 88 set_channel_scope_filterOptions89 Options& set_channel_scope_filter( 90 absl::AnyInvocable<bool( 91 const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const> 92 func) { 93 channel_scope_filter = std::move(func); 94 return *this; 95 } 96 set_server_selectorOptions97 Options& set_server_selector( 98 absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*channel_args*/) 99 const> 100 func) { 101 server_selector = std::move(func); 102 return *this; 103 } 104 set_target_attribute_filterOptions105 Options& set_target_attribute_filter( 106 absl::AnyInvocable<bool(absl::string_view /*target*/) const> func) { 107 target_attribute_filter = std::move(func); 108 return *this; 109 } 110 set_generic_method_attribute_filterOptions111 Options& set_generic_method_attribute_filter( 112 absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const> 113 func) { 114 generic_method_attribute_filter = std::move(func); 115 return *this; 116 } 117 add_plugin_optionOptions118 Options& add_plugin_option( 119 std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption> 120 option) { 121 plugin_options.push_back(std::move(option)); 122 return *this; 123 } 124 add_optional_labelOptions125 Options& add_optional_label(absl::string_view optional_label_key) { 126 optional_label_keys.emplace(optional_label_key); 127 return *this; 128 } 129 130 std::vector<absl::string_view> metric_names; 131 // TODO(yashykt): opentelemetry::sdk::resource::Resource doesn't have a copy 132 // assignment operator so wrapping it in a unique_ptr till it is fixed. 133 std::unique_ptr<opentelemetry::sdk::resource::Resource> resource = 134 std::make_unique<opentelemetry::sdk::resource::Resource>( 135 opentelemetry::sdk::resource::Resource::Create({})); 136 std::unique_ptr<grpc::internal::LabelsInjector> labels_injector; 137 bool use_meter_provider = true; 138 std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey, 139 grpc_core::RefCountedStringValue> 140 labels_to_inject; 141 absl::AnyInvocable<bool( 142 const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const> 143 channel_scope_filter; 144 absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*channel_args*/) 145 const> 146 server_selector; 147 absl::AnyInvocable<bool(absl::string_view /*target*/) const> 148 target_attribute_filter; 149 absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const> 150 generic_method_attribute_filter; 151 std::vector< 152 std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>> 153 plugin_options; 154 absl::flat_hash_set<absl::string_view> optional_label_keys; 155 }; 156 157 class MetricsCollectorThread { 158 public: 159 using ResultType = absl::flat_hash_map< 160 std::string, 161 std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>; 162 MetricsCollectorThread(OpenTelemetryPluginEnd2EndTest* test, 163 grpc_core::Duration interval, int iterations, 164 std::function<bool(const ResultType&)> predicate); 165 ~MetricsCollectorThread(); 166 const ResultType& Stop(); 167 168 private: 169 void Run(); 170 171 OpenTelemetryPluginEnd2EndTest* test_; 172 grpc_core::Duration interval_; 173 int iterations_; 174 std::function<bool(const ResultType&)> predicate_; 175 ResultType data_points_; 176 std::atomic_bool finished_{false}; 177 std::thread thread_; 178 }; 179 180 // Note that we can't use SetUp() here since we want to send in parameters. 181 void Init(Options config); 182 183 void TearDown() override; 184 185 void ResetStub(std::shared_ptr<Channel> channel); 186 187 void SendRPC(); 188 void SendGenericRPC(); 189 190 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> 191 BuildAndRegisterOpenTelemetryPlugin( 192 OpenTelemetryPluginEnd2EndTest::Options options); 193 194 absl::flat_hash_map< 195 std::string, 196 std::vector<opentelemetry::sdk::metrics::PointDataAttributes>> 197 ReadCurrentMetricsData( 198 absl::AnyInvocable< 199 bool(const absl::flat_hash_map< 200 std::string, 201 std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)> 202 continue_predicate, 203 opentelemetry::sdk::metrics::MetricReader* reader = nullptr); 204 205 const absl::string_view kMethodName = "grpc.testing.EchoTestService/Echo"; 206 const absl::string_view kGenericMethodName = "foo/bar"; 207 std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey, 208 grpc_core::RefCountedStringValue> 209 labels_to_inject_; 210 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader_; 211 std::string server_address_; 212 std::string canonical_server_address_; 213 CallbackTestServiceImpl service_; 214 std::unique_ptr<grpc::Server> server_; 215 std::unique_ptr<EchoTestService::Stub> stub_; 216 std::unique_ptr<grpc::GenericStub> generic_stub_; 217 }; 218 219 } // namespace testing 220 } // namespace grpc 221 222 #endif // GRPC_TEST_CPP_EXT_OTEL_OTEL_TEST_LIBRARY_H 223