1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <functional>
16 #include <memory>
17 #include <string>
18 #include <utility>
19
20 #include "absl/base/thread_annotations.h"
21 #include "absl/status/status.h"
22 #include "absl/status/statusor.h"
23 #include "absl/strings/string_view.h"
24 #include "absl/time/time.h"
25 #include "gtest/gtest.h"
26
27 #include <grpc/status.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/time.h>
30
31 #include "src/core/lib/channel/call_tracer.h"
32 #include "src/core/lib/channel/channel_args.h"
33 #include "src/core/lib/channel/channel_fwd.h"
34 #include "src/core/lib/channel/channel_stack.h"
35 #include "src/core/lib/channel/context.h"
36 #include "src/core/lib/channel/metrics.h"
37 #include "src/core/lib/channel/promise_based_filter.h"
38 #include "src/core/lib/channel/tcp_tracer.h"
39 #include "src/core/lib/config/core_configuration.h"
40 #include "src/core/lib/experiments/experiments.h"
41 #include "src/core/lib/gprpp/notification.h"
42 #include "src/core/lib/gprpp/sync.h"
43 #include "src/core/lib/gprpp/time.h"
44 #include "src/core/lib/iomgr/error.h"
45 #include "src/core/lib/promise/arena_promise.h"
46 #include "src/core/lib/promise/context.h"
47 #include "src/core/lib/resource_quota/arena.h"
48 #include "src/core/lib/slice/slice.h"
49 #include "src/core/lib/slice/slice_buffer.h"
50 #include "src/core/lib/surface/channel_stack_type.h"
51 #include "src/core/lib/transport/metadata_batch.h"
52 #include "src/core/lib/transport/transport.h"
53 #include "test/core/end2end/end2end_tests.h"
54 #include "test/core/util/fake_stats_plugin.h"
55
56 namespace grpc_core {
57 namespace {
58
59 Mutex* g_mu;
60 Notification* g_client_call_ended_notify;
61 Notification* g_server_call_ended_notify;
62
63 class FakeCallTracer : public ClientCallTracer {
64 public:
65 class FakeCallAttemptTracer : public CallAttemptTracer {
66 public:
TraceId()67 std::string TraceId() override { return ""; }
SpanId()68 std::string SpanId() override { return ""; }
IsSampled()69 bool IsSampled() override { return false; }
RecordSendInitialMetadata(grpc_metadata_batch *)70 void RecordSendInitialMetadata(
71 grpc_metadata_batch* /*send_initial_metadata*/) override {}
RecordSendTrailingMetadata(grpc_metadata_batch *)72 void RecordSendTrailingMetadata(
73 grpc_metadata_batch* /*send_trailing_metadata*/) override {}
RecordSendMessage(const SliceBuffer &)74 void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
RecordSendCompressedMessage(const SliceBuffer &)75 void RecordSendCompressedMessage(
76 const SliceBuffer& /*send_compressed_message*/) override {}
RecordReceivedInitialMetadata(grpc_metadata_batch *)77 void RecordReceivedInitialMetadata(
78 grpc_metadata_batch* /*recv_initial_metadata*/) override {}
RecordReceivedMessage(const SliceBuffer &)79 void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
RecordReceivedDecompressedMessage(const SliceBuffer &)80 void RecordReceivedDecompressedMessage(
81 const SliceBuffer& /*recv_decompressed_message*/) override {}
82
RecordReceivedTrailingMetadata(absl::Status,grpc_metadata_batch *,const grpc_transport_stream_stats * transport_stream_stats)83 void RecordReceivedTrailingMetadata(
84 absl::Status /*status*/,
85 grpc_metadata_batch* /*recv_trailing_metadata*/,
86 const grpc_transport_stream_stats* transport_stream_stats) override {
87 MutexLock lock(g_mu);
88 transport_stream_stats_ = *transport_stream_stats;
89 }
90
RecordCancel(grpc_error_handle)91 void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
StartNewTcpTrace()92 std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
93 return nullptr;
94 }
RecordEnd(const gpr_timespec &)95 void RecordEnd(const gpr_timespec& /*latency*/) override {
96 g_client_call_ended_notify->Notify();
97 delete this;
98 }
RecordAnnotation(absl::string_view)99 void RecordAnnotation(absl::string_view /*annotation*/) override {}
RecordAnnotation(const Annotation &)100 void RecordAnnotation(const Annotation& /*annotation*/) override {}
101
SetOptionalLabel(OptionalLabelKey,RefCountedStringValue)102 void SetOptionalLabel(OptionalLabelKey /*key*/,
103 RefCountedStringValue /*value*/) override {}
104
transport_stream_stats()105 static grpc_transport_stream_stats transport_stream_stats() {
106 MutexLock lock(g_mu);
107 return transport_stream_stats_;
108 }
109
110 private:
111 static grpc_transport_stream_stats transport_stream_stats_
112 ABSL_GUARDED_BY(g_mu);
113 };
114
FakeCallTracer()115 explicit FakeCallTracer() {}
~FakeCallTracer()116 ~FakeCallTracer() override {}
TraceId()117 std::string TraceId() override { return ""; }
SpanId()118 std::string SpanId() override { return ""; }
IsSampled()119 bool IsSampled() override { return false; }
120
StartNewAttempt(bool)121 FakeCallAttemptTracer* StartNewAttempt(
122 bool /*is_transparent_retry*/) override {
123 return new FakeCallAttemptTracer;
124 }
125
RecordAnnotation(absl::string_view)126 void RecordAnnotation(absl::string_view /*annotation*/) override {}
RecordAnnotation(const Annotation &)127 void RecordAnnotation(const Annotation& /*annotation*/) override {}
128 };
129
130 grpc_transport_stream_stats
131 FakeCallTracer::FakeCallAttemptTracer::transport_stream_stats_;
132
133 class FakeServerCallTracer : public ServerCallTracer {
134 public:
~FakeServerCallTracer()135 ~FakeServerCallTracer() override {}
RecordSendInitialMetadata(grpc_metadata_batch *)136 void RecordSendInitialMetadata(
137 grpc_metadata_batch* /*send_initial_metadata*/) override {}
RecordSendTrailingMetadata(grpc_metadata_batch *)138 void RecordSendTrailingMetadata(
139 grpc_metadata_batch* /*send_trailing_metadata*/) override {}
RecordSendMessage(const SliceBuffer &)140 void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
RecordSendCompressedMessage(const SliceBuffer &)141 void RecordSendCompressedMessage(
142 const SliceBuffer& /*send_compressed_message*/) override {}
RecordReceivedInitialMetadata(grpc_metadata_batch *)143 void RecordReceivedInitialMetadata(
144 grpc_metadata_batch* /*recv_initial_metadata*/) override {}
RecordReceivedMessage(const SliceBuffer &)145 void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
RecordReceivedDecompressedMessage(const SliceBuffer &)146 void RecordReceivedDecompressedMessage(
147 const SliceBuffer& /*recv_decompressed_message*/) override {}
RecordCancel(grpc_error_handle)148 void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
StartNewTcpTrace()149 std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
150 return nullptr;
151 }
RecordReceivedTrailingMetadata(grpc_metadata_batch *)152 void RecordReceivedTrailingMetadata(
153 grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
154
RecordEnd(const grpc_call_final_info * final_info)155 void RecordEnd(const grpc_call_final_info* final_info) override {
156 MutexLock lock(g_mu);
157 transport_stream_stats_ = final_info->stats.transport_stream_stats;
158 g_server_call_ended_notify->Notify();
159 }
160
RecordAnnotation(absl::string_view)161 void RecordAnnotation(absl::string_view /*annotation*/) override {}
RecordAnnotation(const Annotation &)162 void RecordAnnotation(const Annotation& /*annotation*/) override {}
TraceId()163 std::string TraceId() override { return ""; }
SpanId()164 std::string SpanId() override { return ""; }
IsSampled()165 bool IsSampled() override { return false; }
166
transport_stream_stats()167 static grpc_transport_stream_stats transport_stream_stats() {
168 MutexLock lock(g_mu);
169 return transport_stream_stats_;
170 }
171
172 private:
173 static grpc_transport_stream_stats transport_stream_stats_
174 ABSL_GUARDED_BY(g_mu);
175 };
176
177 grpc_transport_stream_stats FakeServerCallTracer::transport_stream_stats_;
178
179 // TODO(yijiem): figure out how to reuse FakeStatsPlugin instead of
180 // inheriting and overriding it here.
181 class NewFakeStatsPlugin : public FakeStatsPlugin {
182 public:
GetClientCallTracer(const Slice &,bool,std::shared_ptr<StatsPlugin::ScopeConfig>)183 ClientCallTracer* GetClientCallTracer(
184 const Slice& /*path*/, bool /*registered_method*/,
185 std::shared_ptr<StatsPlugin::ScopeConfig> /*scope_config*/) override {
186 return GetContext<Arena>()->ManagedNew<FakeCallTracer>();
187 }
GetServerCallTracer(std::shared_ptr<StatsPlugin::ScopeConfig>)188 ServerCallTracer* GetServerCallTracer(
189 std::shared_ptr<StatsPlugin::ScopeConfig> /*scope_config*/) override {
190 return GetContext<Arena>()->ManagedNew<FakeServerCallTracer>();
191 }
192 };
193
194 // This test verifies the HTTP2 stats on a stream
CORE_END2END_TEST(Http2FullstackSingleHopTest,StreamStats)195 CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) {
196 if (!IsHttp2StatsFixEnabled()) {
197 GTEST_SKIP() << "Test needs http2_stats_fix experiment to be enabled";
198 }
199 g_mu = new Mutex();
200 g_client_call_ended_notify = new Notification();
201 g_server_call_ended_notify = new Notification();
202 GlobalStatsPluginRegistry::RegisterStatsPlugin(
203 std::make_shared<NewFakeStatsPlugin>());
204 auto send_from_client = RandomSlice(10);
205 auto send_from_server = RandomSlice(20);
206 CoreEnd2endTest::IncomingStatusOnClient server_status;
207 CoreEnd2endTest::IncomingMetadata server_initial_metadata;
208 CoreEnd2endTest::IncomingMessage server_message;
209 CoreEnd2endTest::IncomingMessage client_message;
210 CoreEnd2endTest::IncomingCloseOnServer client_close;
211 {
212 auto c = NewClientCall("/foo").Timeout(Duration::Minutes(5)).Create();
213 c.NewBatch(1)
214 .SendInitialMetadata({})
215 .SendMessage(send_from_client.Ref())
216 .SendCloseFromClient()
217 .RecvInitialMetadata(server_initial_metadata)
218 .RecvMessage(server_message)
219 .RecvStatusOnClient(server_status);
220 auto s = RequestCall(101);
221 Expect(101, true);
222 Step(Duration::Minutes(1));
223 s.NewBatch(102).SendInitialMetadata({}).RecvMessage(client_message);
224 Expect(102, true);
225 Step(Duration::Minutes(1));
226 s.NewBatch(103)
227 .SendStatusFromServer(GRPC_STATUS_UNIMPLEMENTED, "xyz", {})
228 .SendMessage(send_from_server.Ref())
229 .RecvCloseOnServer(client_close);
230 Expect(103, true);
231 Expect(1, true);
232 Step(Duration::Minutes(1));
233 EXPECT_EQ(s.method(), "/foo");
234 }
235 EXPECT_EQ(server_status.status(), GRPC_STATUS_UNIMPLEMENTED);
236 EXPECT_EQ(server_status.message(), "xyz");
237 EXPECT_FALSE(client_close.was_cancelled());
238 EXPECT_EQ(client_message.payload(), send_from_client);
239 EXPECT_EQ(server_message.payload(), send_from_server);
240 // Make sure that the calls have ended for the stats to have been collected
241 g_client_call_ended_notify->WaitForNotificationWithTimeout(absl::Seconds(5));
242 g_server_call_ended_notify->WaitForNotificationWithTimeout(absl::Seconds(5));
243
244 auto client_transport_stats =
245 FakeCallTracer::FakeCallAttemptTracer::transport_stream_stats();
246 auto server_transport_stats = FakeServerCallTracer::transport_stream_stats();
247 EXPECT_EQ(client_transport_stats.outgoing.data_bytes,
248 send_from_client.size());
249 EXPECT_EQ(client_transport_stats.incoming.data_bytes,
250 send_from_server.size());
251 EXPECT_EQ(server_transport_stats.outgoing.data_bytes,
252 send_from_server.size());
253 EXPECT_EQ(server_transport_stats.incoming.data_bytes,
254 send_from_client.size());
255 // At the very minimum, we should have 9 bytes from initial header frame, 9
256 // bytes from data header frame, 5 bytes from the grpc header on data and 9
257 // bytes from the trailing header frame. The actual number might be more due
258 // to RST_STREAM (13 bytes) and WINDOW_UPDATE (13 bytes) frames.
259 EXPECT_GE(client_transport_stats.outgoing.framing_bytes, 32);
260 EXPECT_LE(client_transport_stats.outgoing.framing_bytes, 58);
261 EXPECT_GE(client_transport_stats.incoming.framing_bytes, 32);
262 EXPECT_LE(client_transport_stats.incoming.framing_bytes, 58);
263 EXPECT_GE(server_transport_stats.outgoing.framing_bytes, 32);
264 EXPECT_LE(server_transport_stats.outgoing.framing_bytes, 58);
265 EXPECT_GE(server_transport_stats.incoming.framing_bytes, 32);
266 EXPECT_LE(server_transport_stats.incoming.framing_bytes, 58);
267
268 delete ServerCallTracerFactory::Get(ChannelArgs());
269 ServerCallTracerFactory::RegisterGlobal(nullptr);
270 delete g_client_call_ended_notify;
271 g_client_call_ended_notify = nullptr;
272 delete g_server_call_ended_notify;
273 g_server_call_ended_notify = nullptr;
274 delete g_mu;
275 g_mu = nullptr;
276 }
277
278 } // namespace
279 } // namespace grpc_core
280