1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/ext/otel/otel_test_library.h"
20
21 #include <atomic>
22
23 #include "absl/functional/any_invocable.h"
24 #include "gmock/gmock.h"
25 #include "gtest/gtest.h"
26 #include "opentelemetry/metrics/provider.h"
27 #include "opentelemetry/sdk/metrics/export/metric_producer.h"
28 #include "opentelemetry/sdk/metrics/meter_provider.h"
29 #include "opentelemetry/sdk/metrics/metric_reader.h"
30
31 #include <grpcpp/grpcpp.h>
32
33 #include "src/core/lib/channel/call_tracer.h"
34 #include "src/core/lib/channel/promise_based_filter.h"
35 #include "src/core/lib/config/core_configuration.h"
36 #include "src/core/lib/gprpp/notification.h"
37 #include "test/core/util/fake_stats_plugin.h"
38 #include "test/core/util/test_config.h"
39 #include "test/cpp/end2end/test_service_impl.h"
40 #include "test/cpp/util/byte_buffer_proto_helper.h"
41
42 namespace grpc {
43 namespace testing {
44
45 #define GRPC_ARG_LABELS_TO_INJECT "grpc.testing.labels_to_inject"
46
47 // A subchannel filter that adds the service labels for test to the
48 // CallAttemptTracer in a call.
49 class AddLabelsFilter : public grpc_core::ChannelFilter {
50 public:
51 static const grpc_channel_filter kFilter;
52
Create(const grpc_core::ChannelArgs & args,ChannelFilter::Args)53 static absl::StatusOr<AddLabelsFilter> Create(
54 const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
55 return AddLabelsFilter(
56 *args.GetPointer<std::map<
57 grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
58 grpc_core::RefCountedStringValue>>(GRPC_ARG_LABELS_TO_INJECT));
59 }
60
MakeCallPromise(grpc_core::CallArgs call_args,grpc_core::NextPromiseFactory next_promise_factory)61 grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
62 grpc_core::CallArgs call_args,
63 grpc_core::NextPromiseFactory next_promise_factory) override {
64 using CallAttemptTracer = grpc_core::ClientCallTracer::CallAttemptTracer;
65 auto* call_context = grpc_core::GetContext<grpc_call_context_element>();
66 auto* call_tracer = static_cast<CallAttemptTracer*>(
67 call_context[GRPC_CONTEXT_CALL_TRACER].value);
68 EXPECT_NE(call_tracer, nullptr);
69 for (const auto& pair : labels_to_inject_) {
70 call_tracer->SetOptionalLabel(pair.first, pair.second);
71 }
72 return next_promise_factory(std::move(call_args));
73 }
74
75 private:
AddLabelsFilter(std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,grpc_core::RefCountedStringValue> labels_to_inject)76 explicit AddLabelsFilter(
77 std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
78 grpc_core::RefCountedStringValue>
79 labels_to_inject)
80 : labels_to_inject_(std::move(labels_to_inject)) {}
81
82 const std::map<
83 grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
84 grpc_core::RefCountedStringValue>
85 labels_to_inject_;
86 };
87
88 const grpc_channel_filter AddLabelsFilter::kFilter =
89 grpc_core::MakePromiseBasedFilter<AddLabelsFilter,
90 grpc_core::FilterEndpoint::kClient>(
91 "add_service_labels_filter");
92
MetricsCollectorThread(OpenTelemetryPluginEnd2EndTest * test,grpc_core::Duration interval,int iterations,std::function<bool (const absl::flat_hash_map<std::string,std::vector<opentelemetry::sdk::metrics::PointDataAttributes>> &)> predicate)93 OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::MetricsCollectorThread(
94 OpenTelemetryPluginEnd2EndTest* test, grpc_core::Duration interval,
95 int iterations,
96 std::function<
97 bool(const absl::flat_hash_map<
98 std::string,
99 std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)>
100 predicate)
101 : test_(test),
102 interval_(interval),
103 iterations_(iterations),
104 predicate_(std::move(predicate)),
105 thread_(&MetricsCollectorThread::Run, this) {}
106
107 OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::
~MetricsCollectorThread()108 ~MetricsCollectorThread() {
109 if (!finished_) {
110 thread_.join();
111 }
112 }
113
Run()114 void OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::Run() {
115 int i = 0;
116 while (i++ < iterations_ || (iterations_ == -1 && !finished_)) {
117 auto data_points = test_->ReadCurrentMetricsData(predicate_);
118 for (auto data : data_points) {
119 auto iter = data_points_.find(data.first);
120 if (iter == data_points_.end()) {
121 data_points_[data.first] = std::move(data.second);
122 } else {
123 for (auto point : data.second) {
124 iter->second.push_back(std::move(point));
125 }
126 }
127 }
128 absl::SleepFor(absl::Milliseconds(interval_.millis()));
129 }
130 }
131
132 const OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::ResultType&
Stop()133 OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::Stop() {
134 finished_ = true;
135 thread_.join();
136 return data_points_;
137 }
138
Init(Options config)139 void OpenTelemetryPluginEnd2EndTest::Init(Options config) {
140 grpc_core::CoreConfiguration::Reset();
141 ChannelArguments channel_args;
142 if (!config.labels_to_inject.empty()) {
143 labels_to_inject_ = std::move(config.labels_to_inject);
144 grpc_core::CoreConfiguration::RegisterBuilder(
145 [](grpc_core::CoreConfiguration::Builder* builder) mutable {
146 builder->channel_init()->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
147 &AddLabelsFilter::kFilter);
148 });
149 channel_args.SetPointer(GRPC_ARG_LABELS_TO_INJECT, &labels_to_inject_);
150 }
151 reader_ = BuildAndRegisterOpenTelemetryPlugin(std::move(config));
152 grpc_init();
153 grpc::ServerBuilder builder;
154 int port;
155 // Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis.
156 builder.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(),
157 &port);
158 builder.RegisterService(&service_);
159 server_ = builder.BuildAndStart();
160 ASSERT_NE(nullptr, server_);
161 ASSERT_NE(0, port);
162 server_address_ = absl::StrCat("localhost:", port);
163 canonical_server_address_ = absl::StrCat("dns:///", server_address_);
164
165 auto channel = grpc::CreateCustomChannel(
166 server_address_, grpc::InsecureChannelCredentials(), channel_args);
167 stub_ = EchoTestService::NewStub(channel);
168 generic_stub_ = std::make_unique<GenericStub>(std::move(channel));
169 }
170
TearDown()171 void OpenTelemetryPluginEnd2EndTest::TearDown() {
172 server_->Shutdown();
173 grpc_shutdown_blocking();
174 grpc_core::ServerCallTracerFactory::TestOnlyReset();
175 grpc_core::GlobalStatsPluginRegistryTestPeer::
176 ResetGlobalStatsPluginRegistry();
177 }
178
ResetStub(std::shared_ptr<Channel> channel)179 void OpenTelemetryPluginEnd2EndTest::ResetStub(
180 std::shared_ptr<Channel> channel) {
181 stub_ = EchoTestService::NewStub(channel);
182 generic_stub_ = std::make_unique<GenericStub>(std::move(channel));
183 }
184
SendRPC()185 void OpenTelemetryPluginEnd2EndTest::SendRPC() {
186 EchoRequest request;
187 request.set_message("foo");
188 EchoResponse response;
189 grpc::ClientContext context;
190 grpc::Status status = stub_->Echo(&context, request, &response);
191 }
192
SendGenericRPC()193 void OpenTelemetryPluginEnd2EndTest::SendGenericRPC() {
194 grpc::ClientContext context;
195 EchoRequest request;
196 std::unique_ptr<ByteBuffer> send_buf = SerializeToByteBuffer(&request);
197 ByteBuffer recv_buf;
198 grpc_core::Notification notify;
199 generic_stub_->UnaryCall(&context, absl::StrCat("/", kGenericMethodName),
200 StubOptions(), send_buf.get(), &recv_buf,
201 [&](grpc::Status /*s*/) { notify.Notify(); });
202 notify.WaitForNotificationWithTimeout(absl::Seconds(5));
203 }
204
205 absl::flat_hash_map<
206 std::string, std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
ReadCurrentMetricsData(absl::AnyInvocable<bool (const absl::flat_hash_map<std::string,std::vector<opentelemetry::sdk::metrics::PointDataAttributes>> &)> continue_predicate,opentelemetry::sdk::metrics::MetricReader * reader)207 OpenTelemetryPluginEnd2EndTest::ReadCurrentMetricsData(
208 absl::AnyInvocable<
209 bool(const absl::flat_hash_map<
210 std::string,
211 std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)>
212 continue_predicate,
213 opentelemetry::sdk::metrics::MetricReader* reader) {
214 if (reader == nullptr) {
215 reader = reader_.get();
216 }
217 absl::flat_hash_map<
218 std::string,
219 std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
220 data;
221 auto deadline = absl::Now() + absl::Seconds(5);
222 do {
223 reader->Collect([&](opentelemetry::sdk::metrics::ResourceMetrics& rm) {
224 for (const opentelemetry::sdk::metrics::ScopeMetrics& smd :
225 rm.scope_metric_data_) {
226 for (const opentelemetry::sdk::metrics::MetricData& md :
227 smd.metric_data_) {
228 for (const opentelemetry::sdk::metrics::PointDataAttributes& dp :
229 md.point_data_attr_) {
230 data[md.instrument_descriptor.name_].push_back(dp);
231 }
232 }
233 }
234 return true;
235 });
236 } while (continue_predicate(data) && deadline > absl::Now());
237 return data;
238 }
239
240 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>
BuildAndRegisterOpenTelemetryPlugin(OpenTelemetryPluginEnd2EndTest::Options options)241 OpenTelemetryPluginEnd2EndTest::BuildAndRegisterOpenTelemetryPlugin(
242 OpenTelemetryPluginEnd2EndTest::Options options) {
243 grpc::internal::OpenTelemetryPluginBuilderImpl ot_builder;
244 // We are resetting the MeterProvider and OpenTelemetry plugin at the start
245 // of each test to avoid test results from one test carrying over to another
246 // test. (Some measurements can get arbitrarily delayed.)
247 auto meter_provider =
248 std::make_shared<opentelemetry::sdk::metrics::MeterProvider>(
249 std::make_unique<opentelemetry::sdk::metrics::ViewRegistry>(),
250 *options.resource);
251 std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader =
252 std::make_shared<grpc::testing::MockMetricReader>();
253 meter_provider->AddMetricReader(reader);
254 ot_builder.DisableAllMetrics();
255 ot_builder.EnableMetrics(options.metric_names);
256 if (options.use_meter_provider) {
257 auto meter_provider =
258 std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
259 reader.reset(new grpc::testing::MockMetricReader);
260 meter_provider->AddMetricReader(reader);
261 ot_builder.SetMeterProvider(std::move(meter_provider));
262 }
263 ot_builder.SetChannelScopeFilter(std::move(options.channel_scope_filter));
264 ot_builder.SetServerSelector(std::move(options.server_selector));
265 ot_builder.SetTargetAttributeFilter(
266 std::move(options.target_attribute_filter));
267 ot_builder.SetGenericMethodAttributeFilter(
268 std::move(options.generic_method_attribute_filter));
269 for (auto& option : options.plugin_options) {
270 ot_builder.AddPluginOption(std::move(option));
271 }
272 for (auto& optional_label_key : options.optional_label_keys) {
273 ot_builder.AddOptionalLabel(optional_label_key);
274 }
275 EXPECT_EQ(ot_builder.BuildAndRegisterGlobal(), absl::OkStatus());
276 return reader;
277 }
278
279 } // namespace testing
280 } // namespace grpc
281