xref: /aosp_15_r20/external/grpc-grpc/test/cpp/ext/otel/otel_test_library.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/ext/otel/otel_test_library.h"
20 
21 #include <atomic>
22 
23 #include "absl/functional/any_invocable.h"
24 #include "gmock/gmock.h"
25 #include "gtest/gtest.h"
26 #include "opentelemetry/metrics/provider.h"
27 #include "opentelemetry/sdk/metrics/export/metric_producer.h"
28 #include "opentelemetry/sdk/metrics/meter_provider.h"
29 #include "opentelemetry/sdk/metrics/metric_reader.h"
30 
31 #include <grpcpp/grpcpp.h>
32 
33 #include "src/core/lib/channel/call_tracer.h"
34 #include "src/core/lib/channel/promise_based_filter.h"
35 #include "src/core/lib/config/core_configuration.h"
36 #include "src/core/lib/gprpp/notification.h"
37 #include "test/core/util/fake_stats_plugin.h"
38 #include "test/core/util/test_config.h"
39 #include "test/cpp/end2end/test_service_impl.h"
40 #include "test/cpp/util/byte_buffer_proto_helper.h"
41 
42 namespace grpc {
43 namespace testing {
44 
45 #define GRPC_ARG_LABELS_TO_INJECT "grpc.testing.labels_to_inject"
46 
47 // A subchannel filter that adds the service labels for test to the
48 // CallAttemptTracer in a call.
49 class AddLabelsFilter : public grpc_core::ChannelFilter {
50  public:
51   static const grpc_channel_filter kFilter;
52 
Create(const grpc_core::ChannelArgs & args,ChannelFilter::Args)53   static absl::StatusOr<AddLabelsFilter> Create(
54       const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
55     return AddLabelsFilter(
56         *args.GetPointer<std::map<
57              grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
58              grpc_core::RefCountedStringValue>>(GRPC_ARG_LABELS_TO_INJECT));
59   }
60 
MakeCallPromise(grpc_core::CallArgs call_args,grpc_core::NextPromiseFactory next_promise_factory)61   grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
62       grpc_core::CallArgs call_args,
63       grpc_core::NextPromiseFactory next_promise_factory) override {
64     using CallAttemptTracer = grpc_core::ClientCallTracer::CallAttemptTracer;
65     auto* call_context = grpc_core::GetContext<grpc_call_context_element>();
66     auto* call_tracer = static_cast<CallAttemptTracer*>(
67         call_context[GRPC_CONTEXT_CALL_TRACER].value);
68     EXPECT_NE(call_tracer, nullptr);
69     for (const auto& pair : labels_to_inject_) {
70       call_tracer->SetOptionalLabel(pair.first, pair.second);
71     }
72     return next_promise_factory(std::move(call_args));
73   }
74 
75  private:
AddLabelsFilter(std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,grpc_core::RefCountedStringValue> labels_to_inject)76   explicit AddLabelsFilter(
77       std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
78                grpc_core::RefCountedStringValue>
79           labels_to_inject)
80       : labels_to_inject_(std::move(labels_to_inject)) {}
81 
82   const std::map<
83       grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
84       grpc_core::RefCountedStringValue>
85       labels_to_inject_;
86 };
87 
88 const grpc_channel_filter AddLabelsFilter::kFilter =
89     grpc_core::MakePromiseBasedFilter<AddLabelsFilter,
90                                       grpc_core::FilterEndpoint::kClient>(
91         "add_service_labels_filter");
92 
MetricsCollectorThread(OpenTelemetryPluginEnd2EndTest * test,grpc_core::Duration interval,int iterations,std::function<bool (const absl::flat_hash_map<std::string,std::vector<opentelemetry::sdk::metrics::PointDataAttributes>> &)> predicate)93 OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::MetricsCollectorThread(
94     OpenTelemetryPluginEnd2EndTest* test, grpc_core::Duration interval,
95     int iterations,
96     std::function<
97         bool(const absl::flat_hash_map<
98              std::string,
99              std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)>
100         predicate)
101     : test_(test),
102       interval_(interval),
103       iterations_(iterations),
104       predicate_(std::move(predicate)),
105       thread_(&MetricsCollectorThread::Run, this) {}
106 
107 OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::
~MetricsCollectorThread()108     ~MetricsCollectorThread() {
109   if (!finished_) {
110     thread_.join();
111   }
112 }
113 
Run()114 void OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::Run() {
115   int i = 0;
116   while (i++ < iterations_ || (iterations_ == -1 && !finished_)) {
117     auto data_points = test_->ReadCurrentMetricsData(predicate_);
118     for (auto data : data_points) {
119       auto iter = data_points_.find(data.first);
120       if (iter == data_points_.end()) {
121         data_points_[data.first] = std::move(data.second);
122       } else {
123         for (auto point : data.second) {
124           iter->second.push_back(std::move(point));
125         }
126       }
127     }
128     absl::SleepFor(absl::Milliseconds(interval_.millis()));
129   }
130 }
131 
132 const OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::ResultType&
Stop()133 OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::Stop() {
134   finished_ = true;
135   thread_.join();
136   return data_points_;
137 }
138 
Init(Options config)139 void OpenTelemetryPluginEnd2EndTest::Init(Options config) {
140   grpc_core::CoreConfiguration::Reset();
141   ChannelArguments channel_args;
142   if (!config.labels_to_inject.empty()) {
143     labels_to_inject_ = std::move(config.labels_to_inject);
144     grpc_core::CoreConfiguration::RegisterBuilder(
145         [](grpc_core::CoreConfiguration::Builder* builder) mutable {
146           builder->channel_init()->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
147                                                   &AddLabelsFilter::kFilter);
148         });
149     channel_args.SetPointer(GRPC_ARG_LABELS_TO_INJECT, &labels_to_inject_);
150   }
151   reader_ = BuildAndRegisterOpenTelemetryPlugin(std::move(config));
152   grpc_init();
153   grpc::ServerBuilder builder;
154   int port;
155   // Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis.
156   builder.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(),
157                            &port);
158   builder.RegisterService(&service_);
159   server_ = builder.BuildAndStart();
160   ASSERT_NE(nullptr, server_);
161   ASSERT_NE(0, port);
162   server_address_ = absl::StrCat("localhost:", port);
163   canonical_server_address_ = absl::StrCat("dns:///", server_address_);
164 
165   auto channel = grpc::CreateCustomChannel(
166       server_address_, grpc::InsecureChannelCredentials(), channel_args);
167   stub_ = EchoTestService::NewStub(channel);
168   generic_stub_ = std::make_unique<GenericStub>(std::move(channel));
169 }
170 
TearDown()171 void OpenTelemetryPluginEnd2EndTest::TearDown() {
172   server_->Shutdown();
173   grpc_shutdown_blocking();
174   grpc_core::ServerCallTracerFactory::TestOnlyReset();
175   grpc_core::GlobalStatsPluginRegistryTestPeer::
176       ResetGlobalStatsPluginRegistry();
177 }
178 
ResetStub(std::shared_ptr<Channel> channel)179 void OpenTelemetryPluginEnd2EndTest::ResetStub(
180     std::shared_ptr<Channel> channel) {
181   stub_ = EchoTestService::NewStub(channel);
182   generic_stub_ = std::make_unique<GenericStub>(std::move(channel));
183 }
184 
SendRPC()185 void OpenTelemetryPluginEnd2EndTest::SendRPC() {
186   EchoRequest request;
187   request.set_message("foo");
188   EchoResponse response;
189   grpc::ClientContext context;
190   grpc::Status status = stub_->Echo(&context, request, &response);
191 }
192 
SendGenericRPC()193 void OpenTelemetryPluginEnd2EndTest::SendGenericRPC() {
194   grpc::ClientContext context;
195   EchoRequest request;
196   std::unique_ptr<ByteBuffer> send_buf = SerializeToByteBuffer(&request);
197   ByteBuffer recv_buf;
198   grpc_core::Notification notify;
199   generic_stub_->UnaryCall(&context, absl::StrCat("/", kGenericMethodName),
200                            StubOptions(), send_buf.get(), &recv_buf,
201                            [&](grpc::Status /*s*/) { notify.Notify(); });
202   notify.WaitForNotificationWithTimeout(absl::Seconds(5));
203 }
204 
205 absl::flat_hash_map<
206     std::string, std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
ReadCurrentMetricsData(absl::AnyInvocable<bool (const absl::flat_hash_map<std::string,std::vector<opentelemetry::sdk::metrics::PointDataAttributes>> &)> continue_predicate,opentelemetry::sdk::metrics::MetricReader * reader)207 OpenTelemetryPluginEnd2EndTest::ReadCurrentMetricsData(
208     absl::AnyInvocable<
209         bool(const absl::flat_hash_map<
210              std::string,
211              std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)>
212         continue_predicate,
213     opentelemetry::sdk::metrics::MetricReader* reader) {
214   if (reader == nullptr) {
215     reader = reader_.get();
216   }
217   absl::flat_hash_map<
218       std::string,
219       std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
220       data;
221   auto deadline = absl::Now() + absl::Seconds(5);
222   do {
223     reader->Collect([&](opentelemetry::sdk::metrics::ResourceMetrics& rm) {
224       for (const opentelemetry::sdk::metrics::ScopeMetrics& smd :
225            rm.scope_metric_data_) {
226         for (const opentelemetry::sdk::metrics::MetricData& md :
227              smd.metric_data_) {
228           for (const opentelemetry::sdk::metrics::PointDataAttributes& dp :
229                md.point_data_attr_) {
230             data[md.instrument_descriptor.name_].push_back(dp);
231           }
232         }
233       }
234       return true;
235     });
236   } while (continue_predicate(data) && deadline > absl::Now());
237   return data;
238 }
239 
240 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>
BuildAndRegisterOpenTelemetryPlugin(OpenTelemetryPluginEnd2EndTest::Options options)241 OpenTelemetryPluginEnd2EndTest::BuildAndRegisterOpenTelemetryPlugin(
242     OpenTelemetryPluginEnd2EndTest::Options options) {
243   grpc::internal::OpenTelemetryPluginBuilderImpl ot_builder;
244   // We are resetting the MeterProvider and OpenTelemetry plugin at the start
245   // of each test to avoid test results from one test carrying over to another
246   // test. (Some measurements can get arbitrarily delayed.)
247   auto meter_provider =
248       std::make_shared<opentelemetry::sdk::metrics::MeterProvider>(
249           std::make_unique<opentelemetry::sdk::metrics::ViewRegistry>(),
250           *options.resource);
251   std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader =
252       std::make_shared<grpc::testing::MockMetricReader>();
253   meter_provider->AddMetricReader(reader);
254   ot_builder.DisableAllMetrics();
255   ot_builder.EnableMetrics(options.metric_names);
256   if (options.use_meter_provider) {
257     auto meter_provider =
258         std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
259     reader.reset(new grpc::testing::MockMetricReader);
260     meter_provider->AddMetricReader(reader);
261     ot_builder.SetMeterProvider(std::move(meter_provider));
262   }
263   ot_builder.SetChannelScopeFilter(std::move(options.channel_scope_filter));
264   ot_builder.SetServerSelector(std::move(options.server_selector));
265   ot_builder.SetTargetAttributeFilter(
266       std::move(options.target_attribute_filter));
267   ot_builder.SetGenericMethodAttributeFilter(
268       std::move(options.generic_method_attribute_filter));
269   for (auto& option : options.plugin_options) {
270     ot_builder.AddPluginOption(std::move(option));
271   }
272   for (auto& optional_label_key : options.optional_label_keys) {
273     ot_builder.AddOptionalLabel(optional_label_key);
274   }
275   EXPECT_EQ(ot_builder.BuildAndRegisterGlobal(), absl::OkStatus());
276   return reader;
277 }
278 
279 }  // namespace testing
280 }  // namespace grpc
281