xref: /aosp_15_r20/external/grpc-grpc/test/cpp/microbenchmarks/fullstack_streaming_ping_pong.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 // Benchmark gRPC end2end in various configurations
20 
21 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
22 #define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
23 
24 #include <sstream>
25 
26 #include <benchmark/benchmark.h>
27 
28 #include "src/proto/grpc/testing/echo.grpc.pb.h"
29 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
30 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
31 
32 namespace grpc {
33 namespace testing {
34 
35 //******************************************************************************
36 // BENCHMARKING KERNELS
37 //
38 
tag(intptr_t x)39 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
40 
41 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
42 // messages in each call) in a loop on a single channel
43 //
44 //  First parmeter (i.e state.range(0)):  Message size (in bytes) to use
45 //  Second parameter (i.e state.range(1)): Number of ping pong messages.
46 //      Note: One ping-pong means two messages (one from client to server and
47 //      the other from server to client):
48 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPong(benchmark::State & state)49 static void BM_StreamingPingPong(benchmark::State& state) {
50   const int msg_size = state.range(0);
51   const int max_ping_pongs = state.range(1);
52 
53   EchoTestService::AsyncService service;
54   std::unique_ptr<Fixture> fixture(new Fixture(&service));
55   {
56     EchoResponse send_response;
57     EchoResponse recv_response;
58     EchoRequest send_request;
59     EchoRequest recv_request;
60 
61     if (msg_size > 0) {
62       send_request.set_message(std::string(msg_size, 'a'));
63       send_response.set_message(std::string(msg_size, 'b'));
64     }
65 
66     std::unique_ptr<EchoTestService::Stub> stub(
67         EchoTestService::NewStub(fixture->channel()));
68 
69     for (auto _ : state) {
70       ServerContext svr_ctx;
71       ServerContextMutator svr_ctx_mut(&svr_ctx);
72       ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
73       service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
74                                 fixture->cq(), tag(0));
75 
76       ClientContext cli_ctx;
77       ClientContextMutator cli_ctx_mut(&cli_ctx);
78       auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
79 
80       // Establish async stream between client side and server side
81       void* t;
82       bool ok;
83       int need_tags = (1 << 0) | (1 << 1);
84       while (need_tags) {
85         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
86         GPR_ASSERT(ok);
87         int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
88         GPR_ASSERT(need_tags & (1 << i));
89         need_tags &= ~(1 << i);
90       }
91 
92       // Send 'max_ping_pongs' number of ping pong messages
93       int ping_pong_cnt = 0;
94       while (ping_pong_cnt < max_ping_pongs) {
95         request_rw->Write(send_request, tag(0));   // Start client send
96         response_rw.Read(&recv_request, tag(1));   // Start server recv
97         request_rw->Read(&recv_response, tag(2));  // Start client recv
98 
99         need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
100         while (need_tags) {
101           GPR_ASSERT(fixture->cq()->Next(&t, &ok));
102           GPR_ASSERT(ok);
103           int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
104 
105           // If server recv is complete, start the server send operation
106           if (i == 1) {
107             response_rw.Write(send_response, tag(3));
108           }
109 
110           GPR_ASSERT(need_tags & (1 << i));
111           need_tags &= ~(1 << i);
112         }
113 
114         ping_pong_cnt++;
115       }
116 
117       request_rw->WritesDone(tag(0));
118       response_rw.Finish(Status::OK, tag(1));
119 
120       Status recv_status;
121       request_rw->Finish(&recv_status, tag(2));
122 
123       need_tags = (1 << 0) | (1 << 1) | (1 << 2);
124       while (need_tags) {
125         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
126         int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
127         GPR_ASSERT(need_tags & (1 << i));
128         need_tags &= ~(1 << i);
129       }
130 
131       GPR_ASSERT(recv_status.ok());
132     }
133   }
134 
135   fixture.reset();
136   state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
137 }
138 
139 // Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
140 //     First parmeter (i.e state.range(0)):  Message size (in bytes) to use
141 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPongMsgs(benchmark::State & state)142 static void BM_StreamingPingPongMsgs(benchmark::State& state) {
143   const int msg_size = state.range(0);
144 
145   EchoTestService::AsyncService service;
146   std::unique_ptr<Fixture> fixture(new Fixture(&service));
147   {
148     EchoResponse send_response;
149     EchoResponse recv_response;
150     EchoRequest send_request;
151     EchoRequest recv_request;
152 
153     if (msg_size > 0) {
154       send_request.set_message(std::string(msg_size, 'a'));
155       send_response.set_message(std::string(msg_size, 'b'));
156     }
157 
158     std::unique_ptr<EchoTestService::Stub> stub(
159         EchoTestService::NewStub(fixture->channel()));
160 
161     ServerContext svr_ctx;
162     ServerContextMutator svr_ctx_mut(&svr_ctx);
163     ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
164     service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
165                               fixture->cq(), tag(0));
166 
167     ClientContext cli_ctx;
168     ClientContextMutator cli_ctx_mut(&cli_ctx);
169     auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
170 
171     // Establish async stream between client side and server side
172     void* t;
173     bool ok;
174     int need_tags = (1 << 0) | (1 << 1);
175     while (need_tags) {
176       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
177       GPR_ASSERT(ok);
178       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
179       GPR_ASSERT(need_tags & (1 << i));
180       need_tags &= ~(1 << i);
181     }
182 
183     for (auto _ : state) {
184       request_rw->Write(send_request, tag(0));   // Start client send
185       response_rw.Read(&recv_request, tag(1));   // Start server recv
186       request_rw->Read(&recv_response, tag(2));  // Start client recv
187 
188       need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
189       while (need_tags) {
190         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
191         GPR_ASSERT(ok);
192         int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
193 
194         // If server recv is complete, start the server send operation
195         if (i == 1) {
196           response_rw.Write(send_response, tag(3));
197         }
198 
199         GPR_ASSERT(need_tags & (1 << i));
200         need_tags &= ~(1 << i);
201       }
202     }
203 
204     request_rw->WritesDone(tag(0));
205     response_rw.Finish(Status::OK, tag(1));
206     Status recv_status;
207     request_rw->Finish(&recv_status, tag(2));
208 
209     need_tags = (1 << 0) | (1 << 1) | (1 << 2);
210     while (need_tags) {
211       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
212       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
213       GPR_ASSERT(need_tags & (1 << i));
214       need_tags &= ~(1 << i);
215     }
216 
217     GPR_ASSERT(recv_status.ok());
218   }
219 
220   fixture.reset();
221   state.SetBytesProcessed(msg_size * state.iterations() * 2);
222 }
223 
224 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
225 // messages in each call) in a loop on a single channel. Different from
226 // BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
227 // WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
228 // sendmsg syscalls for streaming by coalescing 1. initial metadata with first
229 // message; 2. final streaming message with trailing metadata.
230 //
231 //  First parmeter (i.e state.range(0)):  Message size (in bytes) to use
232 //  Second parameter (i.e state.range(1)): Number of ping pong messages.
233 //      Note: One ping-pong means two messages (one from client to server and
234 //      the other from server to client):
235 //  Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
236 //  API and WriteLast API for server.
237 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPongWithCoalescingApi(benchmark::State & state)238 static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
239   const int msg_size = state.range(0);
240   const int max_ping_pongs = state.range(1);
241   // This options is used to test out server API: WriteLast and WriteAndFinish
242   // respectively, since we can not use both of them on server side at the same
243   // time. Value 1 means we are testing out the WriteAndFinish API, and
244   // otherwise we are testing out the WriteLast API.
245   const int write_and_finish = state.range(2);
246 
247   EchoTestService::AsyncService service;
248   std::unique_ptr<Fixture> fixture(new Fixture(&service));
249   {
250     EchoResponse send_response;
251     EchoResponse recv_response;
252     EchoRequest send_request;
253     EchoRequest recv_request;
254 
255     if (msg_size > 0) {
256       send_request.set_message(std::string(msg_size, 'a'));
257       send_response.set_message(std::string(msg_size, 'b'));
258     }
259 
260     std::unique_ptr<EchoTestService::Stub> stub(
261         EchoTestService::NewStub(fixture->channel()));
262 
263     for (auto _ : state) {
264       ServerContext svr_ctx;
265       ServerContextMutator svr_ctx_mut(&svr_ctx);
266       ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
267       service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
268                                 fixture->cq(), tag(0));
269 
270       ClientContext cli_ctx;
271       ClientContextMutator cli_ctx_mut(&cli_ctx);
272       cli_ctx.set_initial_metadata_corked(true);
273       // tag:1 here will never comes up, since we are not performing any op due
274       // to initial metadata coalescing.
275       auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
276 
277       void* t;
278       bool ok;
279       int expect_tags = 0;
280 
281       // Send 'max_ping_pongs' number of ping pong messages
282       int ping_pong_cnt = 0;
283       while (ping_pong_cnt < max_ping_pongs) {
284         if (ping_pong_cnt == max_ping_pongs - 1) {
285           request_rw->WriteLast(send_request, WriteOptions(), tag(2));
286         } else {
287           request_rw->Write(send_request, tag(2));  // Start client send
288         }
289 
290         int await_tags = (1 << 2);
291 
292         if (ping_pong_cnt == 0) {
293           // wait for the server call structure (call_hook, etc.) to be
294           // initialized (async stream between client side and server side
295           // established). It is necessary when client init metadata is
296           // coalesced
297           GPR_ASSERT(fixture->cq()->Next(&t, &ok));
298           while (static_cast<int>(reinterpret_cast<intptr_t>(t)) != 0) {
299             // In some cases tag:2 comes before tag:0 (write tag comes out
300             // first), this while loop is to make sure get tag:0.
301             int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
302             GPR_ASSERT(await_tags & (1 << i));
303             await_tags &= ~(1 << i);
304             GPR_ASSERT(fixture->cq()->Next(&t, &ok));
305           }
306         }
307 
308         response_rw.Read(&recv_request, tag(3));   // Start server recv
309         request_rw->Read(&recv_response, tag(4));  // Start client recv
310 
311         await_tags |= (1 << 3) | (1 << 4);
312         expect_tags = await_tags;
313         await_tags |= (1 << 5);
314 
315         while (await_tags != 0) {
316           GPR_ASSERT(fixture->cq()->Next(&t, &ok));
317           GPR_ASSERT(ok);
318           int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
319 
320           // If server recv is complete, start the server send operation
321           if (i == 3) {
322             if (ping_pong_cnt == max_ping_pongs - 1) {
323               if (write_and_finish == 1) {
324                 response_rw.WriteAndFinish(send_response, WriteOptions(),
325                                            Status::OK, tag(5));
326                 expect_tags |= (1 << 5);
327               } else {
328                 response_rw.WriteLast(send_response, WriteOptions(), tag(5));
329                 // WriteLast buffers the write, so it's possible neither server
330                 // write op nor client read op will finish inside the while
331                 // loop.
332                 await_tags &= ~(1 << 4);
333                 await_tags &= ~(1 << 5);
334                 expect_tags |= (1 << 5);
335               }
336             } else {
337               response_rw.Write(send_response, tag(5));
338               expect_tags |= (1 << 5);
339             }
340           }
341 
342           GPR_ASSERT(expect_tags & (1 << i));
343           expect_tags &= ~(1 << i);
344           await_tags &= ~(1 << i);
345         }
346 
347         ping_pong_cnt++;
348       }
349 
350       if (max_ping_pongs == 0) {
351         expect_tags |= (1 << 6) | (1 << 7) | (1 << 8);
352       } else {
353         if (write_and_finish == 1) {
354           expect_tags |= (1 << 8);
355         } else {
356           // server's buffered write and the client's read of the buffered write
357           // tags should come up.
358           expect_tags |= (1 << 7) | (1 << 8);
359         }
360       }
361 
362       // No message write or initial metadata write happened yet.
363       if (max_ping_pongs == 0) {
364         request_rw->WritesDone(tag(6));
365         // wait for server call data structure(call_hook, etc.) to be
366         // initialized, since initial metadata is corked.
367         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
368         while (static_cast<int>(reinterpret_cast<intptr_t>(t)) != 0) {
369           int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
370           GPR_ASSERT(expect_tags & (1 << i));
371           expect_tags &= ~(1 << i);
372           GPR_ASSERT(fixture->cq()->Next(&t, &ok));
373         }
374         response_rw.Finish(Status::OK, tag(7));
375       } else {
376         if (write_and_finish != 1) {
377           response_rw.Finish(Status::OK, tag(7));
378         }
379       }
380 
381       Status recv_status;
382       request_rw->Finish(&recv_status, tag(8));
383 
384       while (expect_tags) {
385         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
386         int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
387         GPR_ASSERT(expect_tags & (1 << i));
388         expect_tags &= ~(1 << i);
389       }
390 
391       GPR_ASSERT(recv_status.ok());
392     }
393   }
394 
395   fixture.reset();
396   state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
397 }
398 }  // namespace testing
399 }  // namespace grpc
400 
401 #endif  // GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
402