1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 // Benchmark gRPC end2end in various configurations
20
21 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
22 #define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
23
24 #include <sstream>
25
26 #include <benchmark/benchmark.h>
27
28 #include "src/proto/grpc/testing/echo.grpc.pb.h"
29 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
30 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
31
32 namespace grpc {
33 namespace testing {
34
35 //******************************************************************************
36 // BENCHMARKING KERNELS
37 //
38
tag(intptr_t x)39 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
40
41 template <class Fixture>
BM_PumpStreamClientToServer(benchmark::State & state)42 static void BM_PumpStreamClientToServer(benchmark::State& state) {
43 EchoTestService::AsyncService service;
44 std::unique_ptr<Fixture> fixture(new Fixture(&service));
45 {
46 EchoRequest send_request;
47 EchoRequest recv_request;
48 if (state.range(0) > 0) {
49 send_request.set_message(std::string(state.range(0), 'a'));
50 }
51 Status recv_status;
52 ServerContext svr_ctx;
53 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
54 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
55 fixture->cq(), tag(0));
56 std::unique_ptr<EchoTestService::Stub> stub(
57 EchoTestService::NewStub(fixture->channel()));
58 ClientContext cli_ctx;
59 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
60 int need_tags = (1 << 0) | (1 << 1);
61 void* t;
62 bool ok;
63 while (need_tags) {
64 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
65 GPR_ASSERT(ok);
66 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
67 GPR_ASSERT(need_tags & (1 << i));
68 need_tags &= ~(1 << i);
69 }
70 response_rw.Read(&recv_request, tag(0));
71 for (auto _ : state) {
72 request_rw->Write(send_request, tag(1));
73 while (true) {
74 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
75 if (t == tag(0)) {
76 response_rw.Read(&recv_request, tag(0));
77 } else if (t == tag(1)) {
78 break;
79 } else {
80 grpc_core::Crash("unreachable");
81 }
82 }
83 }
84 request_rw->WritesDone(tag(1));
85 need_tags = (1 << 0) | (1 << 1);
86 while (need_tags) {
87 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
88 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
89 GPR_ASSERT(need_tags & (1 << i));
90 need_tags &= ~(1 << i);
91 }
92 response_rw.Finish(Status::OK, tag(0));
93 Status final_status;
94 request_rw->Finish(&final_status, tag(1));
95 need_tags = (1 << 0) | (1 << 1);
96 while (need_tags) {
97 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
98 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
99 GPR_ASSERT(need_tags & (1 << i));
100 need_tags &= ~(1 << i);
101 }
102 GPR_ASSERT(final_status.ok());
103 }
104 fixture.reset();
105 state.SetBytesProcessed(state.range(0) * state.iterations());
106 }
107
108 template <class Fixture>
BM_PumpStreamServerToClient(benchmark::State & state)109 static void BM_PumpStreamServerToClient(benchmark::State& state) {
110 EchoTestService::AsyncService service;
111 std::unique_ptr<Fixture> fixture(new Fixture(&service));
112 {
113 EchoResponse send_response;
114 EchoResponse recv_response;
115 if (state.range(0) > 0) {
116 send_response.set_message(std::string(state.range(0), 'a'));
117 }
118 Status recv_status;
119 ServerContext svr_ctx;
120 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
121 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
122 fixture->cq(), tag(0));
123 std::unique_ptr<EchoTestService::Stub> stub(
124 EchoTestService::NewStub(fixture->channel()));
125 ClientContext cli_ctx;
126 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
127 int need_tags = (1 << 0) | (1 << 1);
128 void* t;
129 bool ok;
130 while (need_tags) {
131 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
132 GPR_ASSERT(ok);
133 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
134 GPR_ASSERT(need_tags & (1 << i));
135 need_tags &= ~(1 << i);
136 }
137 request_rw->Read(&recv_response, tag(0));
138 for (auto _ : state) {
139 response_rw.Write(send_response, tag(1));
140 while (true) {
141 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
142 if (t == tag(0)) {
143 request_rw->Read(&recv_response, tag(0));
144 } else if (t == tag(1)) {
145 break;
146 } else {
147 grpc_core::Crash("unreachable");
148 }
149 }
150 }
151 response_rw.Finish(Status::OK, tag(1));
152 need_tags = (1 << 0) | (1 << 1);
153 while (need_tags) {
154 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
155 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
156 GPR_ASSERT(need_tags & (1 << i));
157 need_tags &= ~(1 << i);
158 }
159 }
160 fixture.reset();
161 state.SetBytesProcessed(state.range(0) * state.iterations());
162 }
163 } // namespace testing
164 } // namespace grpc
165
166 #endif // GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
167