xref: /aosp_15_r20/external/grpc-grpc/test/cpp/microbenchmarks/fullstack_streaming_pump.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 // Benchmark gRPC end2end in various configurations
20 
21 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
22 #define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
23 
24 #include <sstream>
25 
26 #include <benchmark/benchmark.h>
27 
28 #include "src/proto/grpc/testing/echo.grpc.pb.h"
29 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
30 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
31 
32 namespace grpc {
33 namespace testing {
34 
35 //******************************************************************************
36 // BENCHMARKING KERNELS
37 //
38 
tag(intptr_t x)39 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
40 
41 template <class Fixture>
BM_PumpStreamClientToServer(benchmark::State & state)42 static void BM_PumpStreamClientToServer(benchmark::State& state) {
43   EchoTestService::AsyncService service;
44   std::unique_ptr<Fixture> fixture(new Fixture(&service));
45   {
46     EchoRequest send_request;
47     EchoRequest recv_request;
48     if (state.range(0) > 0) {
49       send_request.set_message(std::string(state.range(0), 'a'));
50     }
51     Status recv_status;
52     ServerContext svr_ctx;
53     ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
54     service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
55                               fixture->cq(), tag(0));
56     std::unique_ptr<EchoTestService::Stub> stub(
57         EchoTestService::NewStub(fixture->channel()));
58     ClientContext cli_ctx;
59     auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
60     int need_tags = (1 << 0) | (1 << 1);
61     void* t;
62     bool ok;
63     while (need_tags) {
64       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
65       GPR_ASSERT(ok);
66       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
67       GPR_ASSERT(need_tags & (1 << i));
68       need_tags &= ~(1 << i);
69     }
70     response_rw.Read(&recv_request, tag(0));
71     for (auto _ : state) {
72       request_rw->Write(send_request, tag(1));
73       while (true) {
74         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
75         if (t == tag(0)) {
76           response_rw.Read(&recv_request, tag(0));
77         } else if (t == tag(1)) {
78           break;
79         } else {
80           grpc_core::Crash("unreachable");
81         }
82       }
83     }
84     request_rw->WritesDone(tag(1));
85     need_tags = (1 << 0) | (1 << 1);
86     while (need_tags) {
87       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
88       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
89       GPR_ASSERT(need_tags & (1 << i));
90       need_tags &= ~(1 << i);
91     }
92     response_rw.Finish(Status::OK, tag(0));
93     Status final_status;
94     request_rw->Finish(&final_status, tag(1));
95     need_tags = (1 << 0) | (1 << 1);
96     while (need_tags) {
97       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
98       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
99       GPR_ASSERT(need_tags & (1 << i));
100       need_tags &= ~(1 << i);
101     }
102     GPR_ASSERT(final_status.ok());
103   }
104   fixture.reset();
105   state.SetBytesProcessed(state.range(0) * state.iterations());
106 }
107 
108 template <class Fixture>
BM_PumpStreamServerToClient(benchmark::State & state)109 static void BM_PumpStreamServerToClient(benchmark::State& state) {
110   EchoTestService::AsyncService service;
111   std::unique_ptr<Fixture> fixture(new Fixture(&service));
112   {
113     EchoResponse send_response;
114     EchoResponse recv_response;
115     if (state.range(0) > 0) {
116       send_response.set_message(std::string(state.range(0), 'a'));
117     }
118     Status recv_status;
119     ServerContext svr_ctx;
120     ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
121     service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
122                               fixture->cq(), tag(0));
123     std::unique_ptr<EchoTestService::Stub> stub(
124         EchoTestService::NewStub(fixture->channel()));
125     ClientContext cli_ctx;
126     auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
127     int need_tags = (1 << 0) | (1 << 1);
128     void* t;
129     bool ok;
130     while (need_tags) {
131       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
132       GPR_ASSERT(ok);
133       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
134       GPR_ASSERT(need_tags & (1 << i));
135       need_tags &= ~(1 << i);
136     }
137     request_rw->Read(&recv_response, tag(0));
138     for (auto _ : state) {
139       response_rw.Write(send_response, tag(1));
140       while (true) {
141         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
142         if (t == tag(0)) {
143           request_rw->Read(&recv_response, tag(0));
144         } else if (t == tag(1)) {
145           break;
146         } else {
147           grpc_core::Crash("unreachable");
148         }
149       }
150     }
151     response_rw.Finish(Status::OK, tag(1));
152     need_tags = (1 << 0) | (1 << 1);
153     while (need_tags) {
154       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
155       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
156       GPR_ASSERT(need_tags & (1 << i));
157       need_tags &= ~(1 << i);
158     }
159   }
160   fixture.reset();
161   state.SetBytesProcessed(state.range(0) * state.iterations());
162 }
163 }  // namespace testing
164 }  // namespace grpc
165 
166 #endif  // GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
167