1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 // Benchmark gRPC end2end in various configurations
20
21 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
22 #define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
23
24 #include <sstream>
25
26 #include <benchmark/benchmark.h>
27
28 #include "src/proto/grpc/testing/echo.grpc.pb.h"
29 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
30 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
31
32 namespace grpc {
33 namespace testing {
34
35 //******************************************************************************
36 // BENCHMARKING KERNELS
37 //
38
tag(intptr_t x)39 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
40
41 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
42 // messages in each call) in a loop on a single channel
43 //
44 // First parmeter (i.e state.range(0)): Message size (in bytes) to use
45 // Second parameter (i.e state.range(1)): Number of ping pong messages.
46 // Note: One ping-pong means two messages (one from client to server and
47 // the other from server to client):
48 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPong(benchmark::State & state)49 static void BM_StreamingPingPong(benchmark::State& state) {
50 const int msg_size = state.range(0);
51 const int max_ping_pongs = state.range(1);
52
53 EchoTestService::AsyncService service;
54 std::unique_ptr<Fixture> fixture(new Fixture(&service));
55 {
56 EchoResponse send_response;
57 EchoResponse recv_response;
58 EchoRequest send_request;
59 EchoRequest recv_request;
60
61 if (msg_size > 0) {
62 send_request.set_message(std::string(msg_size, 'a'));
63 send_response.set_message(std::string(msg_size, 'b'));
64 }
65
66 std::unique_ptr<EchoTestService::Stub> stub(
67 EchoTestService::NewStub(fixture->channel()));
68
69 for (auto _ : state) {
70 ServerContext svr_ctx;
71 ServerContextMutator svr_ctx_mut(&svr_ctx);
72 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
73 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
74 fixture->cq(), tag(0));
75
76 ClientContext cli_ctx;
77 ClientContextMutator cli_ctx_mut(&cli_ctx);
78 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
79
80 // Establish async stream between client side and server side
81 void* t;
82 bool ok;
83 int need_tags = (1 << 0) | (1 << 1);
84 while (need_tags) {
85 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
86 GPR_ASSERT(ok);
87 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
88 GPR_ASSERT(need_tags & (1 << i));
89 need_tags &= ~(1 << i);
90 }
91
92 // Send 'max_ping_pongs' number of ping pong messages
93 int ping_pong_cnt = 0;
94 while (ping_pong_cnt < max_ping_pongs) {
95 request_rw->Write(send_request, tag(0)); // Start client send
96 response_rw.Read(&recv_request, tag(1)); // Start server recv
97 request_rw->Read(&recv_response, tag(2)); // Start client recv
98
99 need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
100 while (need_tags) {
101 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
102 GPR_ASSERT(ok);
103 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
104
105 // If server recv is complete, start the server send operation
106 if (i == 1) {
107 response_rw.Write(send_response, tag(3));
108 }
109
110 GPR_ASSERT(need_tags & (1 << i));
111 need_tags &= ~(1 << i);
112 }
113
114 ping_pong_cnt++;
115 }
116
117 request_rw->WritesDone(tag(0));
118 response_rw.Finish(Status::OK, tag(1));
119
120 Status recv_status;
121 request_rw->Finish(&recv_status, tag(2));
122
123 need_tags = (1 << 0) | (1 << 1) | (1 << 2);
124 while (need_tags) {
125 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
126 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
127 GPR_ASSERT(need_tags & (1 << i));
128 need_tags &= ~(1 << i);
129 }
130
131 GPR_ASSERT(recv_status.ok());
132 }
133 }
134
135 fixture.reset();
136 state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
137 }
138
139 // Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
140 // First parmeter (i.e state.range(0)): Message size (in bytes) to use
141 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPongMsgs(benchmark::State & state)142 static void BM_StreamingPingPongMsgs(benchmark::State& state) {
143 const int msg_size = state.range(0);
144
145 EchoTestService::AsyncService service;
146 std::unique_ptr<Fixture> fixture(new Fixture(&service));
147 {
148 EchoResponse send_response;
149 EchoResponse recv_response;
150 EchoRequest send_request;
151 EchoRequest recv_request;
152
153 if (msg_size > 0) {
154 send_request.set_message(std::string(msg_size, 'a'));
155 send_response.set_message(std::string(msg_size, 'b'));
156 }
157
158 std::unique_ptr<EchoTestService::Stub> stub(
159 EchoTestService::NewStub(fixture->channel()));
160
161 ServerContext svr_ctx;
162 ServerContextMutator svr_ctx_mut(&svr_ctx);
163 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
164 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
165 fixture->cq(), tag(0));
166
167 ClientContext cli_ctx;
168 ClientContextMutator cli_ctx_mut(&cli_ctx);
169 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
170
171 // Establish async stream between client side and server side
172 void* t;
173 bool ok;
174 int need_tags = (1 << 0) | (1 << 1);
175 while (need_tags) {
176 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
177 GPR_ASSERT(ok);
178 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
179 GPR_ASSERT(need_tags & (1 << i));
180 need_tags &= ~(1 << i);
181 }
182
183 for (auto _ : state) {
184 request_rw->Write(send_request, tag(0)); // Start client send
185 response_rw.Read(&recv_request, tag(1)); // Start server recv
186 request_rw->Read(&recv_response, tag(2)); // Start client recv
187
188 need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
189 while (need_tags) {
190 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
191 GPR_ASSERT(ok);
192 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
193
194 // If server recv is complete, start the server send operation
195 if (i == 1) {
196 response_rw.Write(send_response, tag(3));
197 }
198
199 GPR_ASSERT(need_tags & (1 << i));
200 need_tags &= ~(1 << i);
201 }
202 }
203
204 request_rw->WritesDone(tag(0));
205 response_rw.Finish(Status::OK, tag(1));
206 Status recv_status;
207 request_rw->Finish(&recv_status, tag(2));
208
209 need_tags = (1 << 0) | (1 << 1) | (1 << 2);
210 while (need_tags) {
211 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
212 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
213 GPR_ASSERT(need_tags & (1 << i));
214 need_tags &= ~(1 << i);
215 }
216
217 GPR_ASSERT(recv_status.ok());
218 }
219
220 fixture.reset();
221 state.SetBytesProcessed(msg_size * state.iterations() * 2);
222 }
223
224 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
225 // messages in each call) in a loop on a single channel. Different from
226 // BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
227 // WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
228 // sendmsg syscalls for streaming by coalescing 1. initial metadata with first
229 // message; 2. final streaming message with trailing metadata.
230 //
231 // First parmeter (i.e state.range(0)): Message size (in bytes) to use
232 // Second parameter (i.e state.range(1)): Number of ping pong messages.
233 // Note: One ping-pong means two messages (one from client to server and
234 // the other from server to client):
235 // Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
236 // API and WriteLast API for server.
237 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPongWithCoalescingApi(benchmark::State & state)238 static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
239 const int msg_size = state.range(0);
240 const int max_ping_pongs = state.range(1);
241 // This options is used to test out server API: WriteLast and WriteAndFinish
242 // respectively, since we can not use both of them on server side at the same
243 // time. Value 1 means we are testing out the WriteAndFinish API, and
244 // otherwise we are testing out the WriteLast API.
245 const int write_and_finish = state.range(2);
246
247 EchoTestService::AsyncService service;
248 std::unique_ptr<Fixture> fixture(new Fixture(&service));
249 {
250 EchoResponse send_response;
251 EchoResponse recv_response;
252 EchoRequest send_request;
253 EchoRequest recv_request;
254
255 if (msg_size > 0) {
256 send_request.set_message(std::string(msg_size, 'a'));
257 send_response.set_message(std::string(msg_size, 'b'));
258 }
259
260 std::unique_ptr<EchoTestService::Stub> stub(
261 EchoTestService::NewStub(fixture->channel()));
262
263 for (auto _ : state) {
264 ServerContext svr_ctx;
265 ServerContextMutator svr_ctx_mut(&svr_ctx);
266 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
267 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
268 fixture->cq(), tag(0));
269
270 ClientContext cli_ctx;
271 ClientContextMutator cli_ctx_mut(&cli_ctx);
272 cli_ctx.set_initial_metadata_corked(true);
273 // tag:1 here will never comes up, since we are not performing any op due
274 // to initial metadata coalescing.
275 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
276
277 void* t;
278 bool ok;
279 int expect_tags = 0;
280
281 // Send 'max_ping_pongs' number of ping pong messages
282 int ping_pong_cnt = 0;
283 while (ping_pong_cnt < max_ping_pongs) {
284 if (ping_pong_cnt == max_ping_pongs - 1) {
285 request_rw->WriteLast(send_request, WriteOptions(), tag(2));
286 } else {
287 request_rw->Write(send_request, tag(2)); // Start client send
288 }
289
290 int await_tags = (1 << 2);
291
292 if (ping_pong_cnt == 0) {
293 // wait for the server call structure (call_hook, etc.) to be
294 // initialized (async stream between client side and server side
295 // established). It is necessary when client init metadata is
296 // coalesced
297 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
298 while (static_cast<int>(reinterpret_cast<intptr_t>(t)) != 0) {
299 // In some cases tag:2 comes before tag:0 (write tag comes out
300 // first), this while loop is to make sure get tag:0.
301 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
302 GPR_ASSERT(await_tags & (1 << i));
303 await_tags &= ~(1 << i);
304 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
305 }
306 }
307
308 response_rw.Read(&recv_request, tag(3)); // Start server recv
309 request_rw->Read(&recv_response, tag(4)); // Start client recv
310
311 await_tags |= (1 << 3) | (1 << 4);
312 expect_tags = await_tags;
313 await_tags |= (1 << 5);
314
315 while (await_tags != 0) {
316 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
317 GPR_ASSERT(ok);
318 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
319
320 // If server recv is complete, start the server send operation
321 if (i == 3) {
322 if (ping_pong_cnt == max_ping_pongs - 1) {
323 if (write_and_finish == 1) {
324 response_rw.WriteAndFinish(send_response, WriteOptions(),
325 Status::OK, tag(5));
326 expect_tags |= (1 << 5);
327 } else {
328 response_rw.WriteLast(send_response, WriteOptions(), tag(5));
329 // WriteLast buffers the write, so it's possible neither server
330 // write op nor client read op will finish inside the while
331 // loop.
332 await_tags &= ~(1 << 4);
333 await_tags &= ~(1 << 5);
334 expect_tags |= (1 << 5);
335 }
336 } else {
337 response_rw.Write(send_response, tag(5));
338 expect_tags |= (1 << 5);
339 }
340 }
341
342 GPR_ASSERT(expect_tags & (1 << i));
343 expect_tags &= ~(1 << i);
344 await_tags &= ~(1 << i);
345 }
346
347 ping_pong_cnt++;
348 }
349
350 if (max_ping_pongs == 0) {
351 expect_tags |= (1 << 6) | (1 << 7) | (1 << 8);
352 } else {
353 if (write_and_finish == 1) {
354 expect_tags |= (1 << 8);
355 } else {
356 // server's buffered write and the client's read of the buffered write
357 // tags should come up.
358 expect_tags |= (1 << 7) | (1 << 8);
359 }
360 }
361
362 // No message write or initial metadata write happened yet.
363 if (max_ping_pongs == 0) {
364 request_rw->WritesDone(tag(6));
365 // wait for server call data structure(call_hook, etc.) to be
366 // initialized, since initial metadata is corked.
367 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
368 while (static_cast<int>(reinterpret_cast<intptr_t>(t)) != 0) {
369 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
370 GPR_ASSERT(expect_tags & (1 << i));
371 expect_tags &= ~(1 << i);
372 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
373 }
374 response_rw.Finish(Status::OK, tag(7));
375 } else {
376 if (write_and_finish != 1) {
377 response_rw.Finish(Status::OK, tag(7));
378 }
379 }
380
381 Status recv_status;
382 request_rw->Finish(&recv_status, tag(8));
383
384 while (expect_tags) {
385 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
386 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
387 GPR_ASSERT(expect_tags & (1 << i));
388 expect_tags &= ~(1 << i);
389 }
390
391 GPR_ASSERT(recv_status.ok());
392 }
393 }
394
395 fixture.reset();
396 state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
397 }
398 } // namespace testing
399 } // namespace grpc
400
401 #endif // GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
402