xref: /aosp_15_r20/external/grpc-grpc/test/core/end2end/tests/http2_stats.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <functional>
16 #include <memory>
17 #include <string>
18 #include <utility>
19 
20 #include "absl/base/thread_annotations.h"
21 #include "absl/status/status.h"
22 #include "absl/status/statusor.h"
23 #include "absl/strings/string_view.h"
24 #include "absl/time/time.h"
25 #include "gtest/gtest.h"
26 
27 #include <grpc/status.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/time.h>
30 
31 #include "src/core/lib/channel/call_tracer.h"
32 #include "src/core/lib/channel/channel_args.h"
33 #include "src/core/lib/channel/channel_fwd.h"
34 #include "src/core/lib/channel/channel_stack.h"
35 #include "src/core/lib/channel/context.h"
36 #include "src/core/lib/channel/metrics.h"
37 #include "src/core/lib/channel/promise_based_filter.h"
38 #include "src/core/lib/channel/tcp_tracer.h"
39 #include "src/core/lib/config/core_configuration.h"
40 #include "src/core/lib/experiments/experiments.h"
41 #include "src/core/lib/gprpp/notification.h"
42 #include "src/core/lib/gprpp/sync.h"
43 #include "src/core/lib/gprpp/time.h"
44 #include "src/core/lib/iomgr/error.h"
45 #include "src/core/lib/promise/arena_promise.h"
46 #include "src/core/lib/promise/context.h"
47 #include "src/core/lib/resource_quota/arena.h"
48 #include "src/core/lib/slice/slice.h"
49 #include "src/core/lib/slice/slice_buffer.h"
50 #include "src/core/lib/surface/channel_stack_type.h"
51 #include "src/core/lib/transport/metadata_batch.h"
52 #include "src/core/lib/transport/transport.h"
53 #include "test/core/end2end/end2end_tests.h"
54 #include "test/core/util/fake_stats_plugin.h"
55 
56 namespace grpc_core {
57 namespace {
58 
59 Mutex* g_mu;
60 Notification* g_client_call_ended_notify;
61 Notification* g_server_call_ended_notify;
62 
63 class FakeCallTracer : public ClientCallTracer {
64  public:
65   class FakeCallAttemptTracer : public CallAttemptTracer {
66    public:
TraceId()67     std::string TraceId() override { return ""; }
SpanId()68     std::string SpanId() override { return ""; }
IsSampled()69     bool IsSampled() override { return false; }
RecordSendInitialMetadata(grpc_metadata_batch *)70     void RecordSendInitialMetadata(
71         grpc_metadata_batch* /*send_initial_metadata*/) override {}
RecordSendTrailingMetadata(grpc_metadata_batch *)72     void RecordSendTrailingMetadata(
73         grpc_metadata_batch* /*send_trailing_metadata*/) override {}
RecordSendMessage(const SliceBuffer &)74     void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
RecordSendCompressedMessage(const SliceBuffer &)75     void RecordSendCompressedMessage(
76         const SliceBuffer& /*send_compressed_message*/) override {}
RecordReceivedInitialMetadata(grpc_metadata_batch *)77     void RecordReceivedInitialMetadata(
78         grpc_metadata_batch* /*recv_initial_metadata*/) override {}
RecordReceivedMessage(const SliceBuffer &)79     void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
RecordReceivedDecompressedMessage(const SliceBuffer &)80     void RecordReceivedDecompressedMessage(
81         const SliceBuffer& /*recv_decompressed_message*/) override {}
82 
RecordReceivedTrailingMetadata(absl::Status,grpc_metadata_batch *,const grpc_transport_stream_stats * transport_stream_stats)83     void RecordReceivedTrailingMetadata(
84         absl::Status /*status*/,
85         grpc_metadata_batch* /*recv_trailing_metadata*/,
86         const grpc_transport_stream_stats* transport_stream_stats) override {
87       MutexLock lock(g_mu);
88       transport_stream_stats_ = *transport_stream_stats;
89     }
90 
RecordCancel(grpc_error_handle)91     void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
StartNewTcpTrace()92     std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
93       return nullptr;
94     }
RecordEnd(const gpr_timespec &)95     void RecordEnd(const gpr_timespec& /*latency*/) override {
96       g_client_call_ended_notify->Notify();
97       delete this;
98     }
RecordAnnotation(absl::string_view)99     void RecordAnnotation(absl::string_view /*annotation*/) override {}
RecordAnnotation(const Annotation &)100     void RecordAnnotation(const Annotation& /*annotation*/) override {}
101 
SetOptionalLabel(OptionalLabelKey,RefCountedStringValue)102     void SetOptionalLabel(OptionalLabelKey /*key*/,
103                           RefCountedStringValue /*value*/) override {}
104 
transport_stream_stats()105     static grpc_transport_stream_stats transport_stream_stats() {
106       MutexLock lock(g_mu);
107       return transport_stream_stats_;
108     }
109 
110    private:
111     static grpc_transport_stream_stats transport_stream_stats_
112         ABSL_GUARDED_BY(g_mu);
113   };
114 
FakeCallTracer()115   explicit FakeCallTracer() {}
~FakeCallTracer()116   ~FakeCallTracer() override {}
TraceId()117   std::string TraceId() override { return ""; }
SpanId()118   std::string SpanId() override { return ""; }
IsSampled()119   bool IsSampled() override { return false; }
120 
StartNewAttempt(bool)121   FakeCallAttemptTracer* StartNewAttempt(
122       bool /*is_transparent_retry*/) override {
123     return new FakeCallAttemptTracer;
124   }
125 
RecordAnnotation(absl::string_view)126   void RecordAnnotation(absl::string_view /*annotation*/) override {}
RecordAnnotation(const Annotation &)127   void RecordAnnotation(const Annotation& /*annotation*/) override {}
128 };
129 
130 grpc_transport_stream_stats
131     FakeCallTracer::FakeCallAttemptTracer::transport_stream_stats_;
132 
133 class FakeServerCallTracer : public ServerCallTracer {
134  public:
~FakeServerCallTracer()135   ~FakeServerCallTracer() override {}
RecordSendInitialMetadata(grpc_metadata_batch *)136   void RecordSendInitialMetadata(
137       grpc_metadata_batch* /*send_initial_metadata*/) override {}
RecordSendTrailingMetadata(grpc_metadata_batch *)138   void RecordSendTrailingMetadata(
139       grpc_metadata_batch* /*send_trailing_metadata*/) override {}
RecordSendMessage(const SliceBuffer &)140   void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
RecordSendCompressedMessage(const SliceBuffer &)141   void RecordSendCompressedMessage(
142       const SliceBuffer& /*send_compressed_message*/) override {}
RecordReceivedInitialMetadata(grpc_metadata_batch *)143   void RecordReceivedInitialMetadata(
144       grpc_metadata_batch* /*recv_initial_metadata*/) override {}
RecordReceivedMessage(const SliceBuffer &)145   void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
RecordReceivedDecompressedMessage(const SliceBuffer &)146   void RecordReceivedDecompressedMessage(
147       const SliceBuffer& /*recv_decompressed_message*/) override {}
RecordCancel(grpc_error_handle)148   void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
StartNewTcpTrace()149   std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
150     return nullptr;
151   }
RecordReceivedTrailingMetadata(grpc_metadata_batch *)152   void RecordReceivedTrailingMetadata(
153       grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
154 
RecordEnd(const grpc_call_final_info * final_info)155   void RecordEnd(const grpc_call_final_info* final_info) override {
156     MutexLock lock(g_mu);
157     transport_stream_stats_ = final_info->stats.transport_stream_stats;
158     g_server_call_ended_notify->Notify();
159   }
160 
RecordAnnotation(absl::string_view)161   void RecordAnnotation(absl::string_view /*annotation*/) override {}
RecordAnnotation(const Annotation &)162   void RecordAnnotation(const Annotation& /*annotation*/) override {}
TraceId()163   std::string TraceId() override { return ""; }
SpanId()164   std::string SpanId() override { return ""; }
IsSampled()165   bool IsSampled() override { return false; }
166 
transport_stream_stats()167   static grpc_transport_stream_stats transport_stream_stats() {
168     MutexLock lock(g_mu);
169     return transport_stream_stats_;
170   }
171 
172  private:
173   static grpc_transport_stream_stats transport_stream_stats_
174       ABSL_GUARDED_BY(g_mu);
175 };
176 
177 grpc_transport_stream_stats FakeServerCallTracer::transport_stream_stats_;
178 
179 // TODO(yijiem): figure out how to reuse FakeStatsPlugin instead of
180 // inheriting and overriding it here.
181 class NewFakeStatsPlugin : public FakeStatsPlugin {
182  public:
GetClientCallTracer(const Slice &,bool,std::shared_ptr<StatsPlugin::ScopeConfig>)183   ClientCallTracer* GetClientCallTracer(
184       const Slice& /*path*/, bool /*registered_method*/,
185       std::shared_ptr<StatsPlugin::ScopeConfig> /*scope_config*/) override {
186     return GetContext<Arena>()->ManagedNew<FakeCallTracer>();
187   }
GetServerCallTracer(std::shared_ptr<StatsPlugin::ScopeConfig>)188   ServerCallTracer* GetServerCallTracer(
189       std::shared_ptr<StatsPlugin::ScopeConfig> /*scope_config*/) override {
190     return GetContext<Arena>()->ManagedNew<FakeServerCallTracer>();
191   }
192 };
193 
194 // This test verifies the HTTP2 stats on a stream
CORE_END2END_TEST(Http2FullstackSingleHopTest,StreamStats)195 CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) {
196   if (!IsHttp2StatsFixEnabled()) {
197     GTEST_SKIP() << "Test needs http2_stats_fix experiment to be enabled";
198   }
199   g_mu = new Mutex();
200   g_client_call_ended_notify = new Notification();
201   g_server_call_ended_notify = new Notification();
202   GlobalStatsPluginRegistry::RegisterStatsPlugin(
203       std::make_shared<NewFakeStatsPlugin>());
204   auto send_from_client = RandomSlice(10);
205   auto send_from_server = RandomSlice(20);
206   CoreEnd2endTest::IncomingStatusOnClient server_status;
207   CoreEnd2endTest::IncomingMetadata server_initial_metadata;
208   CoreEnd2endTest::IncomingMessage server_message;
209   CoreEnd2endTest::IncomingMessage client_message;
210   CoreEnd2endTest::IncomingCloseOnServer client_close;
211   {
212     auto c = NewClientCall("/foo").Timeout(Duration::Minutes(5)).Create();
213     c.NewBatch(1)
214         .SendInitialMetadata({})
215         .SendMessage(send_from_client.Ref())
216         .SendCloseFromClient()
217         .RecvInitialMetadata(server_initial_metadata)
218         .RecvMessage(server_message)
219         .RecvStatusOnClient(server_status);
220     auto s = RequestCall(101);
221     Expect(101, true);
222     Step(Duration::Minutes(1));
223     s.NewBatch(102).SendInitialMetadata({}).RecvMessage(client_message);
224     Expect(102, true);
225     Step(Duration::Minutes(1));
226     s.NewBatch(103)
227         .SendStatusFromServer(GRPC_STATUS_UNIMPLEMENTED, "xyz", {})
228         .SendMessage(send_from_server.Ref())
229         .RecvCloseOnServer(client_close);
230     Expect(103, true);
231     Expect(1, true);
232     Step(Duration::Minutes(1));
233     EXPECT_EQ(s.method(), "/foo");
234   }
235   EXPECT_EQ(server_status.status(), GRPC_STATUS_UNIMPLEMENTED);
236   EXPECT_EQ(server_status.message(), "xyz");
237   EXPECT_FALSE(client_close.was_cancelled());
238   EXPECT_EQ(client_message.payload(), send_from_client);
239   EXPECT_EQ(server_message.payload(), send_from_server);
240   // Make sure that the calls have ended for the stats to have been collected
241   g_client_call_ended_notify->WaitForNotificationWithTimeout(absl::Seconds(5));
242   g_server_call_ended_notify->WaitForNotificationWithTimeout(absl::Seconds(5));
243 
244   auto client_transport_stats =
245       FakeCallTracer::FakeCallAttemptTracer::transport_stream_stats();
246   auto server_transport_stats = FakeServerCallTracer::transport_stream_stats();
247   EXPECT_EQ(client_transport_stats.outgoing.data_bytes,
248             send_from_client.size());
249   EXPECT_EQ(client_transport_stats.incoming.data_bytes,
250             send_from_server.size());
251   EXPECT_EQ(server_transport_stats.outgoing.data_bytes,
252             send_from_server.size());
253   EXPECT_EQ(server_transport_stats.incoming.data_bytes,
254             send_from_client.size());
255   // At the very minimum, we should have 9 bytes from initial header frame, 9
256   // bytes from data header frame, 5 bytes from the grpc header on data and 9
257   // bytes from the trailing header frame. The actual number might be more due
258   // to RST_STREAM (13 bytes) and WINDOW_UPDATE (13 bytes) frames.
259   EXPECT_GE(client_transport_stats.outgoing.framing_bytes, 32);
260   EXPECT_LE(client_transport_stats.outgoing.framing_bytes, 58);
261   EXPECT_GE(client_transport_stats.incoming.framing_bytes, 32);
262   EXPECT_LE(client_transport_stats.incoming.framing_bytes, 58);
263   EXPECT_GE(server_transport_stats.outgoing.framing_bytes, 32);
264   EXPECT_LE(server_transport_stats.outgoing.framing_bytes, 58);
265   EXPECT_GE(server_transport_stats.incoming.framing_bytes, 32);
266   EXPECT_LE(server_transport_stats.incoming.framing_bytes, 58);
267 
268   delete ServerCallTracerFactory::Get(ChannelArgs());
269   ServerCallTracerFactory::RegisterGlobal(nullptr);
270   delete g_client_call_ended_notify;
271   g_client_call_ended_notify = nullptr;
272   delete g_server_call_ended_notify;
273   g_server_call_ended_notify = nullptr;
274   delete g_mu;
275   g_mu = nullptr;
276 }
277 
278 }  // namespace
279 }  // namespace grpc_core
280