1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker //
3*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2015 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker //
5*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker //
9*cc02d7e2SAndroid Build Coastguard Worker // http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker //
11*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker //
18*cc02d7e2SAndroid Build Coastguard Worker
19*cc02d7e2SAndroid Build Coastguard Worker #include <forward_list>
20*cc02d7e2SAndroid Build Coastguard Worker #include <functional>
21*cc02d7e2SAndroid Build Coastguard Worker #include <list>
22*cc02d7e2SAndroid Build Coastguard Worker #include <memory>
23*cc02d7e2SAndroid Build Coastguard Worker #include <mutex>
24*cc02d7e2SAndroid Build Coastguard Worker #include <sstream>
25*cc02d7e2SAndroid Build Coastguard Worker #include <string>
26*cc02d7e2SAndroid Build Coastguard Worker #include <thread>
27*cc02d7e2SAndroid Build Coastguard Worker #include <utility>
28*cc02d7e2SAndroid Build Coastguard Worker #include <vector>
29*cc02d7e2SAndroid Build Coastguard Worker
30*cc02d7e2SAndroid Build Coastguard Worker #include "absl/memory/memory.h"
31*cc02d7e2SAndroid Build Coastguard Worker
32*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/grpc.h>
33*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/cpu.h>
34*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/log.h>
35*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/alarm.h>
36*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/channel.h>
37*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/client_context.h>
38*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/generic/generic_stub.h>
39*cc02d7e2SAndroid Build Coastguard Worker
40*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/crash.h"
41*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/surface/completion_queue.h"
42*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
43*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/client.h"
44*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/usage_timer.h"
45*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/util/create_test_channel.h"
46*cc02d7e2SAndroid Build Coastguard Worker
47*cc02d7e2SAndroid Build Coastguard Worker namespace grpc {
48*cc02d7e2SAndroid Build Coastguard Worker namespace testing {
49*cc02d7e2SAndroid Build Coastguard Worker
50*cc02d7e2SAndroid Build Coastguard Worker class ClientRpcContext {
51*cc02d7e2SAndroid Build Coastguard Worker public:
ClientRpcContext()52*cc02d7e2SAndroid Build Coastguard Worker ClientRpcContext() {}
~ClientRpcContext()53*cc02d7e2SAndroid Build Coastguard Worker virtual ~ClientRpcContext() {}
54*cc02d7e2SAndroid Build Coastguard Worker // next state, return false if done. Collect stats when appropriate
55*cc02d7e2SAndroid Build Coastguard Worker virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
56*cc02d7e2SAndroid Build Coastguard Worker virtual void StartNewClone(CompletionQueue* cq) = 0;
tag(ClientRpcContext * c)57*cc02d7e2SAndroid Build Coastguard Worker static void* tag(ClientRpcContext* c) { return static_cast<void*>(c); }
detag(void * t)58*cc02d7e2SAndroid Build Coastguard Worker static ClientRpcContext* detag(void* t) {
59*cc02d7e2SAndroid Build Coastguard Worker return static_cast<ClientRpcContext*>(t);
60*cc02d7e2SAndroid Build Coastguard Worker }
61*cc02d7e2SAndroid Build Coastguard Worker
62*cc02d7e2SAndroid Build Coastguard Worker virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;
63*cc02d7e2SAndroid Build Coastguard Worker virtual void TryCancel() = 0;
64*cc02d7e2SAndroid Build Coastguard Worker };
65*cc02d7e2SAndroid Build Coastguard Worker
66*cc02d7e2SAndroid Build Coastguard Worker template <class RequestType, class ResponseType>
67*cc02d7e2SAndroid Build Coastguard Worker class ClientRpcContextUnaryImpl : public ClientRpcContext {
68*cc02d7e2SAndroid Build Coastguard Worker public:
ClientRpcContextUnaryImpl(BenchmarkService::Stub * stub,const RequestType & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> (BenchmarkService::Stub *,grpc::ClientContext *,const RequestType &,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ResponseType *,HistogramEntry *)> on_done)69*cc02d7e2SAndroid Build Coastguard Worker ClientRpcContextUnaryImpl(
70*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub* stub, const RequestType& req,
71*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue,
72*cc02d7e2SAndroid Build Coastguard Worker std::function<
73*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
74*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
75*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue*)>
76*cc02d7e2SAndroid Build Coastguard Worker prepare_req,
77*cc02d7e2SAndroid Build Coastguard Worker std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done)
78*cc02d7e2SAndroid Build Coastguard Worker : context_(),
79*cc02d7e2SAndroid Build Coastguard Worker stub_(stub),
80*cc02d7e2SAndroid Build Coastguard Worker cq_(nullptr),
81*cc02d7e2SAndroid Build Coastguard Worker req_(req),
82*cc02d7e2SAndroid Build Coastguard Worker response_(),
83*cc02d7e2SAndroid Build Coastguard Worker next_state_(State::READY),
84*cc02d7e2SAndroid Build Coastguard Worker callback_(on_done),
85*cc02d7e2SAndroid Build Coastguard Worker next_issue_(std::move(next_issue)),
86*cc02d7e2SAndroid Build Coastguard Worker prepare_req_(prepare_req) {}
~ClientRpcContextUnaryImpl()87*cc02d7e2SAndroid Build Coastguard Worker ~ClientRpcContextUnaryImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)88*cc02d7e2SAndroid Build Coastguard Worker void Start(CompletionQueue* cq, const ClientConfig& config) override {
89*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(!config.use_coalesce_api()); // not supported.
90*cc02d7e2SAndroid Build Coastguard Worker StartInternal(cq);
91*cc02d7e2SAndroid Build Coastguard Worker }
RunNextState(bool,HistogramEntry * entry)92*cc02d7e2SAndroid Build Coastguard Worker bool RunNextState(bool /*ok*/, HistogramEntry* entry) override {
93*cc02d7e2SAndroid Build Coastguard Worker switch (next_state_) {
94*cc02d7e2SAndroid Build Coastguard Worker case State::READY:
95*cc02d7e2SAndroid Build Coastguard Worker start_ = UsageTimer::Now();
96*cc02d7e2SAndroid Build Coastguard Worker response_reader_ = prepare_req_(stub_, &context_, req_, cq_);
97*cc02d7e2SAndroid Build Coastguard Worker response_reader_->StartCall();
98*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::RESP_DONE;
99*cc02d7e2SAndroid Build Coastguard Worker response_reader_->Finish(&response_, &status_,
100*cc02d7e2SAndroid Build Coastguard Worker ClientRpcContext::tag(this));
101*cc02d7e2SAndroid Build Coastguard Worker return true;
102*cc02d7e2SAndroid Build Coastguard Worker case State::RESP_DONE:
103*cc02d7e2SAndroid Build Coastguard Worker if (status_.ok()) {
104*cc02d7e2SAndroid Build Coastguard Worker entry->set_value((UsageTimer::Now() - start_) * 1e9);
105*cc02d7e2SAndroid Build Coastguard Worker }
106*cc02d7e2SAndroid Build Coastguard Worker callback_(status_, &response_, entry);
107*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::INVALID;
108*cc02d7e2SAndroid Build Coastguard Worker return false;
109*cc02d7e2SAndroid Build Coastguard Worker default:
110*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash("unreachable");
111*cc02d7e2SAndroid Build Coastguard Worker return false;
112*cc02d7e2SAndroid Build Coastguard Worker }
113*cc02d7e2SAndroid Build Coastguard Worker }
StartNewClone(CompletionQueue * cq)114*cc02d7e2SAndroid Build Coastguard Worker void StartNewClone(CompletionQueue* cq) override {
115*cc02d7e2SAndroid Build Coastguard Worker auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
116*cc02d7e2SAndroid Build Coastguard Worker prepare_req_, callback_);
117*cc02d7e2SAndroid Build Coastguard Worker clone->StartInternal(cq);
118*cc02d7e2SAndroid Build Coastguard Worker }
TryCancel()119*cc02d7e2SAndroid Build Coastguard Worker void TryCancel() override { context_.TryCancel(); }
120*cc02d7e2SAndroid Build Coastguard Worker
121*cc02d7e2SAndroid Build Coastguard Worker private:
122*cc02d7e2SAndroid Build Coastguard Worker grpc::ClientContext context_;
123*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub* stub_;
124*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue* cq_;
125*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Alarm> alarm_;
126*cc02d7e2SAndroid Build Coastguard Worker const RequestType& req_;
127*cc02d7e2SAndroid Build Coastguard Worker ResponseType response_;
128*cc02d7e2SAndroid Build Coastguard Worker enum State { INVALID, READY, RESP_DONE };
129*cc02d7e2SAndroid Build Coastguard Worker State next_state_;
130*cc02d7e2SAndroid Build Coastguard Worker std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> callback_;
131*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue_;
132*cc02d7e2SAndroid Build Coastguard Worker std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
133*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
134*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue*)>
135*cc02d7e2SAndroid Build Coastguard Worker prepare_req_;
136*cc02d7e2SAndroid Build Coastguard Worker grpc::Status status_;
137*cc02d7e2SAndroid Build Coastguard Worker double start_;
138*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
139*cc02d7e2SAndroid Build Coastguard Worker response_reader_;
140*cc02d7e2SAndroid Build Coastguard Worker
StartInternal(CompletionQueue * cq)141*cc02d7e2SAndroid Build Coastguard Worker void StartInternal(CompletionQueue* cq) {
142*cc02d7e2SAndroid Build Coastguard Worker cq_ = cq;
143*cc02d7e2SAndroid Build Coastguard Worker if (!next_issue_) { // ready to issue
144*cc02d7e2SAndroid Build Coastguard Worker RunNextState(true, nullptr);
145*cc02d7e2SAndroid Build Coastguard Worker } else { // wait for the issue time
146*cc02d7e2SAndroid Build Coastguard Worker alarm_ = std::make_unique<Alarm>();
147*cc02d7e2SAndroid Build Coastguard Worker alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
148*cc02d7e2SAndroid Build Coastguard Worker }
149*cc02d7e2SAndroid Build Coastguard Worker }
150*cc02d7e2SAndroid Build Coastguard Worker };
151*cc02d7e2SAndroid Build Coastguard Worker
152*cc02d7e2SAndroid Build Coastguard Worker template <class StubType, class RequestType>
153*cc02d7e2SAndroid Build Coastguard Worker class AsyncClient : public ClientImpl<StubType, RequestType> {
154*cc02d7e2SAndroid Build Coastguard Worker // Specify which protected members we are using since there is no
155*cc02d7e2SAndroid Build Coastguard Worker // member name resolution until the template types are fully resolved
156*cc02d7e2SAndroid Build Coastguard Worker public:
157*cc02d7e2SAndroid Build Coastguard Worker using Client::closed_loop_;
158*cc02d7e2SAndroid Build Coastguard Worker using Client::NextIssuer;
159*cc02d7e2SAndroid Build Coastguard Worker using Client::SetupLoadTest;
160*cc02d7e2SAndroid Build Coastguard Worker using ClientImpl<StubType, RequestType>::cores_;
161*cc02d7e2SAndroid Build Coastguard Worker using ClientImpl<StubType, RequestType>::channels_;
162*cc02d7e2SAndroid Build Coastguard Worker using ClientImpl<StubType, RequestType>::request_;
AsyncClient(const ClientConfig & config,std::function<ClientRpcContext * (StubType *,std::function<gpr_timespec ()> next_issue,const RequestType &)> setup_ctx,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub)163*cc02d7e2SAndroid Build Coastguard Worker AsyncClient(const ClientConfig& config,
164*cc02d7e2SAndroid Build Coastguard Worker std::function<ClientRpcContext*(
165*cc02d7e2SAndroid Build Coastguard Worker StubType*, std::function<gpr_timespec()> next_issue,
166*cc02d7e2SAndroid Build Coastguard Worker const RequestType&)>
167*cc02d7e2SAndroid Build Coastguard Worker setup_ctx,
168*cc02d7e2SAndroid Build Coastguard Worker std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
169*cc02d7e2SAndroid Build Coastguard Worker create_stub)
170*cc02d7e2SAndroid Build Coastguard Worker : ClientImpl<StubType, RequestType>(config, create_stub),
171*cc02d7e2SAndroid Build Coastguard Worker num_async_threads_(NumThreads(config)) {
172*cc02d7e2SAndroid Build Coastguard Worker SetupLoadTest(config, num_async_threads_);
173*cc02d7e2SAndroid Build Coastguard Worker
174*cc02d7e2SAndroid Build Coastguard Worker int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
175*cc02d7e2SAndroid Build Coastguard Worker int num_cqs = (num_async_threads_ + tpc - 1) / tpc; // ceiling operator
176*cc02d7e2SAndroid Build Coastguard Worker for (int i = 0; i < num_cqs; i++) {
177*cc02d7e2SAndroid Build Coastguard Worker cli_cqs_.emplace_back(new CompletionQueue);
178*cc02d7e2SAndroid Build Coastguard Worker }
179*cc02d7e2SAndroid Build Coastguard Worker
180*cc02d7e2SAndroid Build Coastguard Worker for (int i = 0; i < num_async_threads_; i++) {
181*cc02d7e2SAndroid Build Coastguard Worker cq_.emplace_back(i % cli_cqs_.size());
182*cc02d7e2SAndroid Build Coastguard Worker next_issuers_.emplace_back(NextIssuer(i));
183*cc02d7e2SAndroid Build Coastguard Worker shutdown_state_.emplace_back(new PerThreadShutdownState());
184*cc02d7e2SAndroid Build Coastguard Worker }
185*cc02d7e2SAndroid Build Coastguard Worker
186*cc02d7e2SAndroid Build Coastguard Worker int t = 0;
187*cc02d7e2SAndroid Build Coastguard Worker for (int ch = 0; ch < config.client_channels(); ch++) {
188*cc02d7e2SAndroid Build Coastguard Worker for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
189*cc02d7e2SAndroid Build Coastguard Worker auto* cq = cli_cqs_[t].get();
190*cc02d7e2SAndroid Build Coastguard Worker auto ctx =
191*cc02d7e2SAndroid Build Coastguard Worker setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_);
192*cc02d7e2SAndroid Build Coastguard Worker ctx->Start(cq, config);
193*cc02d7e2SAndroid Build Coastguard Worker }
194*cc02d7e2SAndroid Build Coastguard Worker t = (t + 1) % cli_cqs_.size();
195*cc02d7e2SAndroid Build Coastguard Worker }
196*cc02d7e2SAndroid Build Coastguard Worker }
~AsyncClient()197*cc02d7e2SAndroid Build Coastguard Worker ~AsyncClient() override {
198*cc02d7e2SAndroid Build Coastguard Worker for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
199*cc02d7e2SAndroid Build Coastguard Worker void* got_tag;
200*cc02d7e2SAndroid Build Coastguard Worker bool ok;
201*cc02d7e2SAndroid Build Coastguard Worker while ((*cq)->Next(&got_tag, &ok)) {
202*cc02d7e2SAndroid Build Coastguard Worker delete ClientRpcContext::detag(got_tag);
203*cc02d7e2SAndroid Build Coastguard Worker }
204*cc02d7e2SAndroid Build Coastguard Worker }
205*cc02d7e2SAndroid Build Coastguard Worker }
206*cc02d7e2SAndroid Build Coastguard Worker
GetPollCount()207*cc02d7e2SAndroid Build Coastguard Worker int GetPollCount() override {
208*cc02d7e2SAndroid Build Coastguard Worker int count = 0;
209*cc02d7e2SAndroid Build Coastguard Worker for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
210*cc02d7e2SAndroid Build Coastguard Worker count += grpc_get_cq_poll_num((*cq)->cq());
211*cc02d7e2SAndroid Build Coastguard Worker }
212*cc02d7e2SAndroid Build Coastguard Worker return count;
213*cc02d7e2SAndroid Build Coastguard Worker }
214*cc02d7e2SAndroid Build Coastguard Worker
215*cc02d7e2SAndroid Build Coastguard Worker protected:
216*cc02d7e2SAndroid Build Coastguard Worker const int num_async_threads_;
217*cc02d7e2SAndroid Build Coastguard Worker
218*cc02d7e2SAndroid Build Coastguard Worker private:
219*cc02d7e2SAndroid Build Coastguard Worker struct PerThreadShutdownState {
220*cc02d7e2SAndroid Build Coastguard Worker mutable std::mutex mutex;
221*cc02d7e2SAndroid Build Coastguard Worker bool shutdown;
PerThreadShutdownStategrpc::testing::AsyncClient::PerThreadShutdownState222*cc02d7e2SAndroid Build Coastguard Worker PerThreadShutdownState() : shutdown(false) {}
223*cc02d7e2SAndroid Build Coastguard Worker };
224*cc02d7e2SAndroid Build Coastguard Worker
NumThreads(const ClientConfig & config)225*cc02d7e2SAndroid Build Coastguard Worker int NumThreads(const ClientConfig& config) {
226*cc02d7e2SAndroid Build Coastguard Worker int num_threads = config.async_client_threads();
227*cc02d7e2SAndroid Build Coastguard Worker if (num_threads <= 0) { // Use dynamic sizing
228*cc02d7e2SAndroid Build Coastguard Worker num_threads = cores_;
229*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
230*cc02d7e2SAndroid Build Coastguard Worker }
231*cc02d7e2SAndroid Build Coastguard Worker return num_threads;
232*cc02d7e2SAndroid Build Coastguard Worker }
DestroyMultithreading()233*cc02d7e2SAndroid Build Coastguard Worker void DestroyMultithreading() final {
234*cc02d7e2SAndroid Build Coastguard Worker for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
235*cc02d7e2SAndroid Build Coastguard Worker std::lock_guard<std::mutex> lock((*ss)->mutex);
236*cc02d7e2SAndroid Build Coastguard Worker (*ss)->shutdown = true;
237*cc02d7e2SAndroid Build Coastguard Worker }
238*cc02d7e2SAndroid Build Coastguard Worker for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
239*cc02d7e2SAndroid Build Coastguard Worker (*cq)->Shutdown();
240*cc02d7e2SAndroid Build Coastguard Worker }
241*cc02d7e2SAndroid Build Coastguard Worker this->EndThreads(); // this needed for resolution
242*cc02d7e2SAndroid Build Coastguard Worker }
243*cc02d7e2SAndroid Build Coastguard Worker
ProcessTag(size_t thread_idx,void * tag)244*cc02d7e2SAndroid Build Coastguard Worker ClientRpcContext* ProcessTag(size_t thread_idx, void* tag) {
245*cc02d7e2SAndroid Build Coastguard Worker ClientRpcContext* ctx = ClientRpcContext::detag(tag);
246*cc02d7e2SAndroid Build Coastguard Worker if (shutdown_state_[thread_idx]->shutdown) {
247*cc02d7e2SAndroid Build Coastguard Worker ctx->TryCancel();
248*cc02d7e2SAndroid Build Coastguard Worker delete ctx;
249*cc02d7e2SAndroid Build Coastguard Worker bool ok;
250*cc02d7e2SAndroid Build Coastguard Worker while (cli_cqs_[cq_[thread_idx]]->Next(&tag, &ok)) {
251*cc02d7e2SAndroid Build Coastguard Worker ctx = ClientRpcContext::detag(tag);
252*cc02d7e2SAndroid Build Coastguard Worker ctx->TryCancel();
253*cc02d7e2SAndroid Build Coastguard Worker delete ctx;
254*cc02d7e2SAndroid Build Coastguard Worker }
255*cc02d7e2SAndroid Build Coastguard Worker return nullptr;
256*cc02d7e2SAndroid Build Coastguard Worker }
257*cc02d7e2SAndroid Build Coastguard Worker return ctx;
258*cc02d7e2SAndroid Build Coastguard Worker }
259*cc02d7e2SAndroid Build Coastguard Worker
ThreadFunc(size_t thread_idx,Client::Thread * t)260*cc02d7e2SAndroid Build Coastguard Worker void ThreadFunc(size_t thread_idx, Client::Thread* t) final {
261*cc02d7e2SAndroid Build Coastguard Worker void* got_tag;
262*cc02d7e2SAndroid Build Coastguard Worker bool ok;
263*cc02d7e2SAndroid Build Coastguard Worker
264*cc02d7e2SAndroid Build Coastguard Worker HistogramEntry entry;
265*cc02d7e2SAndroid Build Coastguard Worker HistogramEntry* entry_ptr = &entry;
266*cc02d7e2SAndroid Build Coastguard Worker if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
267*cc02d7e2SAndroid Build Coastguard Worker return;
268*cc02d7e2SAndroid Build Coastguard Worker }
269*cc02d7e2SAndroid Build Coastguard Worker std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
270*cc02d7e2SAndroid Build Coastguard Worker shutdown_mu->lock();
271*cc02d7e2SAndroid Build Coastguard Worker ClientRpcContext* ctx = ProcessTag(thread_idx, got_tag);
272*cc02d7e2SAndroid Build Coastguard Worker if (ctx == nullptr) {
273*cc02d7e2SAndroid Build Coastguard Worker shutdown_mu->unlock();
274*cc02d7e2SAndroid Build Coastguard Worker return;
275*cc02d7e2SAndroid Build Coastguard Worker }
276*cc02d7e2SAndroid Build Coastguard Worker while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
277*cc02d7e2SAndroid Build Coastguard Worker [&, ctx, ok, entry_ptr, shutdown_mu]() {
278*cc02d7e2SAndroid Build Coastguard Worker if (!ctx->RunNextState(ok, entry_ptr)) {
279*cc02d7e2SAndroid Build Coastguard Worker // The RPC and callback are done, so clone the ctx
280*cc02d7e2SAndroid Build Coastguard Worker // and kickstart the new one
281*cc02d7e2SAndroid Build Coastguard Worker ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
282*cc02d7e2SAndroid Build Coastguard Worker delete ctx;
283*cc02d7e2SAndroid Build Coastguard Worker }
284*cc02d7e2SAndroid Build Coastguard Worker shutdown_mu->unlock();
285*cc02d7e2SAndroid Build Coastguard Worker },
286*cc02d7e2SAndroid Build Coastguard Worker &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) {
287*cc02d7e2SAndroid Build Coastguard Worker t->UpdateHistogram(entry_ptr);
288*cc02d7e2SAndroid Build Coastguard Worker entry = HistogramEntry();
289*cc02d7e2SAndroid Build Coastguard Worker shutdown_mu->lock();
290*cc02d7e2SAndroid Build Coastguard Worker ctx = ProcessTag(thread_idx, got_tag);
291*cc02d7e2SAndroid Build Coastguard Worker if (ctx == nullptr) {
292*cc02d7e2SAndroid Build Coastguard Worker shutdown_mu->unlock();
293*cc02d7e2SAndroid Build Coastguard Worker return;
294*cc02d7e2SAndroid Build Coastguard Worker }
295*cc02d7e2SAndroid Build Coastguard Worker }
296*cc02d7e2SAndroid Build Coastguard Worker }
297*cc02d7e2SAndroid Build Coastguard Worker
298*cc02d7e2SAndroid Build Coastguard Worker std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
299*cc02d7e2SAndroid Build Coastguard Worker std::vector<int> cq_;
300*cc02d7e2SAndroid Build Coastguard Worker std::vector<std::function<gpr_timespec()>> next_issuers_;
301*cc02d7e2SAndroid Build Coastguard Worker std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
302*cc02d7e2SAndroid Build Coastguard Worker };
303*cc02d7e2SAndroid Build Coastguard Worker
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)304*cc02d7e2SAndroid Build Coastguard Worker static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
305*cc02d7e2SAndroid Build Coastguard Worker const std::shared_ptr<Channel>& ch) {
306*cc02d7e2SAndroid Build Coastguard Worker return BenchmarkService::NewStub(ch);
307*cc02d7e2SAndroid Build Coastguard Worker }
308*cc02d7e2SAndroid Build Coastguard Worker
309*cc02d7e2SAndroid Build Coastguard Worker class AsyncUnaryClient final
310*cc02d7e2SAndroid Build Coastguard Worker : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
311*cc02d7e2SAndroid Build Coastguard Worker public:
AsyncUnaryClient(const ClientConfig & config)312*cc02d7e2SAndroid Build Coastguard Worker explicit AsyncUnaryClient(const ClientConfig& config)
313*cc02d7e2SAndroid Build Coastguard Worker : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
314*cc02d7e2SAndroid Build Coastguard Worker config, SetupCtx, BenchmarkStubCreator) {
315*cc02d7e2SAndroid Build Coastguard Worker StartThreads(num_async_threads_);
316*cc02d7e2SAndroid Build Coastguard Worker }
~AsyncUnaryClient()317*cc02d7e2SAndroid Build Coastguard Worker ~AsyncUnaryClient() override {}
318*cc02d7e2SAndroid Build Coastguard Worker
319*cc02d7e2SAndroid Build Coastguard Worker private:
CheckDone(const grpc::Status & s,SimpleResponse *,HistogramEntry * entry)320*cc02d7e2SAndroid Build Coastguard Worker static void CheckDone(const grpc::Status& s, SimpleResponse* /*response*/,
321*cc02d7e2SAndroid Build Coastguard Worker HistogramEntry* entry) {
322*cc02d7e2SAndroid Build Coastguard Worker entry->set_status(s.error_code());
323*cc02d7e2SAndroid Build Coastguard Worker }
324*cc02d7e2SAndroid Build Coastguard Worker static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
PrepareReq(BenchmarkService::Stub * stub,grpc::ClientContext * ctx,const SimpleRequest & request,CompletionQueue * cq)325*cc02d7e2SAndroid Build Coastguard Worker PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
326*cc02d7e2SAndroid Build Coastguard Worker const SimpleRequest& request, CompletionQueue* cq) {
327*cc02d7e2SAndroid Build Coastguard Worker return stub->PrepareAsyncUnaryCall(ctx, request, cq);
328*cc02d7e2SAndroid Build Coastguard Worker };
SetupCtx(BenchmarkService::Stub * stub,std::function<gpr_timespec ()> next_issue,const SimpleRequest & req)329*cc02d7e2SAndroid Build Coastguard Worker static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
330*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue,
331*cc02d7e2SAndroid Build Coastguard Worker const SimpleRequest& req) {
332*cc02d7e2SAndroid Build Coastguard Worker return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
333*cc02d7e2SAndroid Build Coastguard Worker stub, req, std::move(next_issue), AsyncUnaryClient::PrepareReq,
334*cc02d7e2SAndroid Build Coastguard Worker AsyncUnaryClient::CheckDone);
335*cc02d7e2SAndroid Build Coastguard Worker }
336*cc02d7e2SAndroid Build Coastguard Worker };
337*cc02d7e2SAndroid Build Coastguard Worker
338*cc02d7e2SAndroid Build Coastguard Worker template <class RequestType, class ResponseType>
339*cc02d7e2SAndroid Build Coastguard Worker class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
340*cc02d7e2SAndroid Build Coastguard Worker public:
ClientRpcContextStreamingPingPongImpl(BenchmarkService::Stub * stub,const RequestType & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType,ResponseType>> (BenchmarkService::Stub *,grpc::ClientContext *,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ResponseType *)> on_done)341*cc02d7e2SAndroid Build Coastguard Worker ClientRpcContextStreamingPingPongImpl(
342*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub* stub, const RequestType& req,
343*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue,
344*cc02d7e2SAndroid Build Coastguard Worker std::function<std::unique_ptr<
345*cc02d7e2SAndroid Build Coastguard Worker grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
346*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
347*cc02d7e2SAndroid Build Coastguard Worker prepare_req,
348*cc02d7e2SAndroid Build Coastguard Worker std::function<void(grpc::Status, ResponseType*)> on_done)
349*cc02d7e2SAndroid Build Coastguard Worker : context_(),
350*cc02d7e2SAndroid Build Coastguard Worker stub_(stub),
351*cc02d7e2SAndroid Build Coastguard Worker cq_(nullptr),
352*cc02d7e2SAndroid Build Coastguard Worker req_(req),
353*cc02d7e2SAndroid Build Coastguard Worker response_(),
354*cc02d7e2SAndroid Build Coastguard Worker next_state_(State::INVALID),
355*cc02d7e2SAndroid Build Coastguard Worker callback_(on_done),
356*cc02d7e2SAndroid Build Coastguard Worker next_issue_(std::move(next_issue)),
357*cc02d7e2SAndroid Build Coastguard Worker prepare_req_(prepare_req),
358*cc02d7e2SAndroid Build Coastguard Worker coalesce_(false) {}
~ClientRpcContextStreamingPingPongImpl()359*cc02d7e2SAndroid Build Coastguard Worker ~ClientRpcContextStreamingPingPongImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)360*cc02d7e2SAndroid Build Coastguard Worker void Start(CompletionQueue* cq, const ClientConfig& config) override {
361*cc02d7e2SAndroid Build Coastguard Worker StartInternal(cq, config.messages_per_stream(), config.use_coalesce_api());
362*cc02d7e2SAndroid Build Coastguard Worker }
RunNextState(bool ok,HistogramEntry * entry)363*cc02d7e2SAndroid Build Coastguard Worker bool RunNextState(bool ok, HistogramEntry* entry) override {
364*cc02d7e2SAndroid Build Coastguard Worker while (true) {
365*cc02d7e2SAndroid Build Coastguard Worker switch (next_state_) {
366*cc02d7e2SAndroid Build Coastguard Worker case State::STREAM_IDLE:
367*cc02d7e2SAndroid Build Coastguard Worker if (!next_issue_) { // ready to issue
368*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::READY_TO_WRITE;
369*cc02d7e2SAndroid Build Coastguard Worker } else {
370*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::WAIT;
371*cc02d7e2SAndroid Build Coastguard Worker }
372*cc02d7e2SAndroid Build Coastguard Worker break; // loop around, don't return
373*cc02d7e2SAndroid Build Coastguard Worker case State::WAIT:
374*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::READY_TO_WRITE;
375*cc02d7e2SAndroid Build Coastguard Worker alarm_ = std::make_unique<Alarm>();
376*cc02d7e2SAndroid Build Coastguard Worker alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
377*cc02d7e2SAndroid Build Coastguard Worker return true;
378*cc02d7e2SAndroid Build Coastguard Worker case State::READY_TO_WRITE:
379*cc02d7e2SAndroid Build Coastguard Worker if (!ok) {
380*cc02d7e2SAndroid Build Coastguard Worker return false;
381*cc02d7e2SAndroid Build Coastguard Worker }
382*cc02d7e2SAndroid Build Coastguard Worker start_ = UsageTimer::Now();
383*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::WRITE_DONE;
384*cc02d7e2SAndroid Build Coastguard Worker if (coalesce_ && messages_issued_ == messages_per_stream_ - 1) {
385*cc02d7e2SAndroid Build Coastguard Worker stream_->WriteLast(req_, WriteOptions(),
386*cc02d7e2SAndroid Build Coastguard Worker ClientRpcContext::tag(this));
387*cc02d7e2SAndroid Build Coastguard Worker } else {
388*cc02d7e2SAndroid Build Coastguard Worker stream_->Write(req_, ClientRpcContext::tag(this));
389*cc02d7e2SAndroid Build Coastguard Worker }
390*cc02d7e2SAndroid Build Coastguard Worker return true;
391*cc02d7e2SAndroid Build Coastguard Worker case State::WRITE_DONE:
392*cc02d7e2SAndroid Build Coastguard Worker if (!ok) {
393*cc02d7e2SAndroid Build Coastguard Worker return false;
394*cc02d7e2SAndroid Build Coastguard Worker }
395*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::READ_DONE;
396*cc02d7e2SAndroid Build Coastguard Worker stream_->Read(&response_, ClientRpcContext::tag(this));
397*cc02d7e2SAndroid Build Coastguard Worker return true;
398*cc02d7e2SAndroid Build Coastguard Worker break;
399*cc02d7e2SAndroid Build Coastguard Worker case State::READ_DONE:
400*cc02d7e2SAndroid Build Coastguard Worker entry->set_value((UsageTimer::Now() - start_) * 1e9);
401*cc02d7e2SAndroid Build Coastguard Worker callback_(status_, &response_);
402*cc02d7e2SAndroid Build Coastguard Worker if ((messages_per_stream_ != 0) &&
403*cc02d7e2SAndroid Build Coastguard Worker (++messages_issued_ >= messages_per_stream_)) {
404*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::WRITES_DONE_DONE;
405*cc02d7e2SAndroid Build Coastguard Worker if (coalesce_) {
406*cc02d7e2SAndroid Build Coastguard Worker // WritesDone should have been called on the last Write.
407*cc02d7e2SAndroid Build Coastguard Worker // loop around to call Finish.
408*cc02d7e2SAndroid Build Coastguard Worker break;
409*cc02d7e2SAndroid Build Coastguard Worker }
410*cc02d7e2SAndroid Build Coastguard Worker stream_->WritesDone(ClientRpcContext::tag(this));
411*cc02d7e2SAndroid Build Coastguard Worker return true;
412*cc02d7e2SAndroid Build Coastguard Worker }
413*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::STREAM_IDLE;
414*cc02d7e2SAndroid Build Coastguard Worker break; // loop around
415*cc02d7e2SAndroid Build Coastguard Worker case State::WRITES_DONE_DONE:
416*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::FINISH_DONE;
417*cc02d7e2SAndroid Build Coastguard Worker stream_->Finish(&status_, ClientRpcContext::tag(this));
418*cc02d7e2SAndroid Build Coastguard Worker return true;
419*cc02d7e2SAndroid Build Coastguard Worker case State::FINISH_DONE:
420*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::INVALID;
421*cc02d7e2SAndroid Build Coastguard Worker return false;
422*cc02d7e2SAndroid Build Coastguard Worker break;
423*cc02d7e2SAndroid Build Coastguard Worker default:
424*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash("unreachable");
425*cc02d7e2SAndroid Build Coastguard Worker return false;
426*cc02d7e2SAndroid Build Coastguard Worker }
427*cc02d7e2SAndroid Build Coastguard Worker }
428*cc02d7e2SAndroid Build Coastguard Worker }
StartNewClone(CompletionQueue * cq)429*cc02d7e2SAndroid Build Coastguard Worker void StartNewClone(CompletionQueue* cq) override {
430*cc02d7e2SAndroid Build Coastguard Worker auto* clone = new ClientRpcContextStreamingPingPongImpl(
431*cc02d7e2SAndroid Build Coastguard Worker stub_, req_, next_issue_, prepare_req_, callback_);
432*cc02d7e2SAndroid Build Coastguard Worker clone->StartInternal(cq, messages_per_stream_, coalesce_);
433*cc02d7e2SAndroid Build Coastguard Worker }
TryCancel()434*cc02d7e2SAndroid Build Coastguard Worker void TryCancel() override { context_.TryCancel(); }
435*cc02d7e2SAndroid Build Coastguard Worker
436*cc02d7e2SAndroid Build Coastguard Worker private:
437*cc02d7e2SAndroid Build Coastguard Worker grpc::ClientContext context_;
438*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub* stub_;
439*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue* cq_;
440*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Alarm> alarm_;
441*cc02d7e2SAndroid Build Coastguard Worker const RequestType& req_;
442*cc02d7e2SAndroid Build Coastguard Worker ResponseType response_;
443*cc02d7e2SAndroid Build Coastguard Worker enum State {
444*cc02d7e2SAndroid Build Coastguard Worker INVALID,
445*cc02d7e2SAndroid Build Coastguard Worker STREAM_IDLE,
446*cc02d7e2SAndroid Build Coastguard Worker WAIT,
447*cc02d7e2SAndroid Build Coastguard Worker READY_TO_WRITE,
448*cc02d7e2SAndroid Build Coastguard Worker WRITE_DONE,
449*cc02d7e2SAndroid Build Coastguard Worker READ_DONE,
450*cc02d7e2SAndroid Build Coastguard Worker WRITES_DONE_DONE,
451*cc02d7e2SAndroid Build Coastguard Worker FINISH_DONE
452*cc02d7e2SAndroid Build Coastguard Worker };
453*cc02d7e2SAndroid Build Coastguard Worker State next_state_;
454*cc02d7e2SAndroid Build Coastguard Worker std::function<void(grpc::Status, ResponseType*)> callback_;
455*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue_;
456*cc02d7e2SAndroid Build Coastguard Worker std::function<
457*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
458*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
459*cc02d7e2SAndroid Build Coastguard Worker prepare_req_;
460*cc02d7e2SAndroid Build Coastguard Worker grpc::Status status_;
461*cc02d7e2SAndroid Build Coastguard Worker double start_;
462*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
463*cc02d7e2SAndroid Build Coastguard Worker stream_;
464*cc02d7e2SAndroid Build Coastguard Worker
465*cc02d7e2SAndroid Build Coastguard Worker // Allow a limit on number of messages in a stream
466*cc02d7e2SAndroid Build Coastguard Worker int messages_per_stream_;
467*cc02d7e2SAndroid Build Coastguard Worker int messages_issued_;
468*cc02d7e2SAndroid Build Coastguard Worker // Whether to use coalescing API.
469*cc02d7e2SAndroid Build Coastguard Worker bool coalesce_;
470*cc02d7e2SAndroid Build Coastguard Worker
StartInternal(CompletionQueue * cq,int messages_per_stream,bool coalesce)471*cc02d7e2SAndroid Build Coastguard Worker void StartInternal(CompletionQueue* cq, int messages_per_stream,
472*cc02d7e2SAndroid Build Coastguard Worker bool coalesce) {
473*cc02d7e2SAndroid Build Coastguard Worker cq_ = cq;
474*cc02d7e2SAndroid Build Coastguard Worker messages_per_stream_ = messages_per_stream;
475*cc02d7e2SAndroid Build Coastguard Worker messages_issued_ = 0;
476*cc02d7e2SAndroid Build Coastguard Worker coalesce_ = coalesce;
477*cc02d7e2SAndroid Build Coastguard Worker if (coalesce_) {
478*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(messages_per_stream_ != 0);
479*cc02d7e2SAndroid Build Coastguard Worker context_.set_initial_metadata_corked(true);
480*cc02d7e2SAndroid Build Coastguard Worker }
481*cc02d7e2SAndroid Build Coastguard Worker stream_ = prepare_req_(stub_, &context_, cq);
482*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::STREAM_IDLE;
483*cc02d7e2SAndroid Build Coastguard Worker stream_->StartCall(ClientRpcContext::tag(this));
484*cc02d7e2SAndroid Build Coastguard Worker if (coalesce_) {
485*cc02d7e2SAndroid Build Coastguard Worker // When the initial metadata is corked, the tag will not come back and we
486*cc02d7e2SAndroid Build Coastguard Worker // need to manually drive the state machine.
487*cc02d7e2SAndroid Build Coastguard Worker RunNextState(true, nullptr);
488*cc02d7e2SAndroid Build Coastguard Worker }
489*cc02d7e2SAndroid Build Coastguard Worker }
490*cc02d7e2SAndroid Build Coastguard Worker };
491*cc02d7e2SAndroid Build Coastguard Worker
492*cc02d7e2SAndroid Build Coastguard Worker class AsyncStreamingPingPongClient final
493*cc02d7e2SAndroid Build Coastguard Worker : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
494*cc02d7e2SAndroid Build Coastguard Worker public:
AsyncStreamingPingPongClient(const ClientConfig & config)495*cc02d7e2SAndroid Build Coastguard Worker explicit AsyncStreamingPingPongClient(const ClientConfig& config)
496*cc02d7e2SAndroid Build Coastguard Worker : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
497*cc02d7e2SAndroid Build Coastguard Worker config, SetupCtx, BenchmarkStubCreator) {
498*cc02d7e2SAndroid Build Coastguard Worker StartThreads(num_async_threads_);
499*cc02d7e2SAndroid Build Coastguard Worker }
500*cc02d7e2SAndroid Build Coastguard Worker
~AsyncStreamingPingPongClient()501*cc02d7e2SAndroid Build Coastguard Worker ~AsyncStreamingPingPongClient() override {}
502*cc02d7e2SAndroid Build Coastguard Worker
503*cc02d7e2SAndroid Build Coastguard Worker private:
CheckDone(const grpc::Status &,SimpleResponse *)504*cc02d7e2SAndroid Build Coastguard Worker static void CheckDone(const grpc::Status& /*s*/,
505*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse* /*response*/) {}
506*cc02d7e2SAndroid Build Coastguard Worker static std::unique_ptr<
507*cc02d7e2SAndroid Build Coastguard Worker grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
PrepareReq(BenchmarkService::Stub * stub,grpc::ClientContext * ctx,CompletionQueue * cq)508*cc02d7e2SAndroid Build Coastguard Worker PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
509*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue* cq) {
510*cc02d7e2SAndroid Build Coastguard Worker auto stream = stub->PrepareAsyncStreamingCall(ctx, cq);
511*cc02d7e2SAndroid Build Coastguard Worker return stream;
512*cc02d7e2SAndroid Build Coastguard Worker };
SetupCtx(BenchmarkService::Stub * stub,std::function<gpr_timespec ()> next_issue,const SimpleRequest & req)513*cc02d7e2SAndroid Build Coastguard Worker static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
514*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue,
515*cc02d7e2SAndroid Build Coastguard Worker const SimpleRequest& req) {
516*cc02d7e2SAndroid Build Coastguard Worker return new ClientRpcContextStreamingPingPongImpl<SimpleRequest,
517*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse>(
518*cc02d7e2SAndroid Build Coastguard Worker stub, req, std::move(next_issue),
519*cc02d7e2SAndroid Build Coastguard Worker AsyncStreamingPingPongClient::PrepareReq,
520*cc02d7e2SAndroid Build Coastguard Worker AsyncStreamingPingPongClient::CheckDone);
521*cc02d7e2SAndroid Build Coastguard Worker }
522*cc02d7e2SAndroid Build Coastguard Worker };
523*cc02d7e2SAndroid Build Coastguard Worker
524*cc02d7e2SAndroid Build Coastguard Worker template <class RequestType, class ResponseType>
525*cc02d7e2SAndroid Build Coastguard Worker class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
526*cc02d7e2SAndroid Build Coastguard Worker public:
ClientRpcContextStreamingFromClientImpl(BenchmarkService::Stub * stub,const RequestType & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> (BenchmarkService::Stub *,grpc::ClientContext *,ResponseType *,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ResponseType *)> on_done)527*cc02d7e2SAndroid Build Coastguard Worker ClientRpcContextStreamingFromClientImpl(
528*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub* stub, const RequestType& req,
529*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue,
530*cc02d7e2SAndroid Build Coastguard Worker std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
531*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
532*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue*)>
533*cc02d7e2SAndroid Build Coastguard Worker prepare_req,
534*cc02d7e2SAndroid Build Coastguard Worker std::function<void(grpc::Status, ResponseType*)> on_done)
535*cc02d7e2SAndroid Build Coastguard Worker : context_(),
536*cc02d7e2SAndroid Build Coastguard Worker stub_(stub),
537*cc02d7e2SAndroid Build Coastguard Worker cq_(nullptr),
538*cc02d7e2SAndroid Build Coastguard Worker req_(req),
539*cc02d7e2SAndroid Build Coastguard Worker response_(),
540*cc02d7e2SAndroid Build Coastguard Worker next_state_(State::INVALID),
541*cc02d7e2SAndroid Build Coastguard Worker callback_(on_done),
542*cc02d7e2SAndroid Build Coastguard Worker next_issue_(std::move(next_issue)),
543*cc02d7e2SAndroid Build Coastguard Worker prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromClientImpl()544*cc02d7e2SAndroid Build Coastguard Worker ~ClientRpcContextStreamingFromClientImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)545*cc02d7e2SAndroid Build Coastguard Worker void Start(CompletionQueue* cq, const ClientConfig& config) override {
546*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
547*cc02d7e2SAndroid Build Coastguard Worker StartInternal(cq);
548*cc02d7e2SAndroid Build Coastguard Worker }
RunNextState(bool ok,HistogramEntry * entry)549*cc02d7e2SAndroid Build Coastguard Worker bool RunNextState(bool ok, HistogramEntry* entry) override {
550*cc02d7e2SAndroid Build Coastguard Worker while (true) {
551*cc02d7e2SAndroid Build Coastguard Worker switch (next_state_) {
552*cc02d7e2SAndroid Build Coastguard Worker case State::STREAM_IDLE:
553*cc02d7e2SAndroid Build Coastguard Worker if (!next_issue_) { // ready to issue
554*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::READY_TO_WRITE;
555*cc02d7e2SAndroid Build Coastguard Worker } else {
556*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::WAIT;
557*cc02d7e2SAndroid Build Coastguard Worker }
558*cc02d7e2SAndroid Build Coastguard Worker break; // loop around, don't return
559*cc02d7e2SAndroid Build Coastguard Worker case State::WAIT:
560*cc02d7e2SAndroid Build Coastguard Worker alarm_ = std::make_unique<Alarm>();
561*cc02d7e2SAndroid Build Coastguard Worker alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
562*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::READY_TO_WRITE;
563*cc02d7e2SAndroid Build Coastguard Worker return true;
564*cc02d7e2SAndroid Build Coastguard Worker case State::READY_TO_WRITE:
565*cc02d7e2SAndroid Build Coastguard Worker if (!ok) {
566*cc02d7e2SAndroid Build Coastguard Worker return false;
567*cc02d7e2SAndroid Build Coastguard Worker }
568*cc02d7e2SAndroid Build Coastguard Worker start_ = UsageTimer::Now();
569*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::WRITE_DONE;
570*cc02d7e2SAndroid Build Coastguard Worker stream_->Write(req_, ClientRpcContext::tag(this));
571*cc02d7e2SAndroid Build Coastguard Worker return true;
572*cc02d7e2SAndroid Build Coastguard Worker case State::WRITE_DONE:
573*cc02d7e2SAndroid Build Coastguard Worker if (!ok) {
574*cc02d7e2SAndroid Build Coastguard Worker return false;
575*cc02d7e2SAndroid Build Coastguard Worker }
576*cc02d7e2SAndroid Build Coastguard Worker entry->set_value((UsageTimer::Now() - start_) * 1e9);
577*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::STREAM_IDLE;
578*cc02d7e2SAndroid Build Coastguard Worker break; // loop around
579*cc02d7e2SAndroid Build Coastguard Worker default:
580*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash("unreachable");
581*cc02d7e2SAndroid Build Coastguard Worker return false;
582*cc02d7e2SAndroid Build Coastguard Worker }
583*cc02d7e2SAndroid Build Coastguard Worker }
584*cc02d7e2SAndroid Build Coastguard Worker }
StartNewClone(CompletionQueue * cq)585*cc02d7e2SAndroid Build Coastguard Worker void StartNewClone(CompletionQueue* cq) override {
586*cc02d7e2SAndroid Build Coastguard Worker auto* clone = new ClientRpcContextStreamingFromClientImpl(
587*cc02d7e2SAndroid Build Coastguard Worker stub_, req_, next_issue_, prepare_req_, callback_);
588*cc02d7e2SAndroid Build Coastguard Worker clone->StartInternal(cq);
589*cc02d7e2SAndroid Build Coastguard Worker }
TryCancel()590*cc02d7e2SAndroid Build Coastguard Worker void TryCancel() override { context_.TryCancel(); }
591*cc02d7e2SAndroid Build Coastguard Worker
592*cc02d7e2SAndroid Build Coastguard Worker private:
593*cc02d7e2SAndroid Build Coastguard Worker grpc::ClientContext context_;
594*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub* stub_;
595*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue* cq_;
596*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Alarm> alarm_;
597*cc02d7e2SAndroid Build Coastguard Worker const RequestType& req_;
598*cc02d7e2SAndroid Build Coastguard Worker ResponseType response_;
599*cc02d7e2SAndroid Build Coastguard Worker enum State {
600*cc02d7e2SAndroid Build Coastguard Worker INVALID,
601*cc02d7e2SAndroid Build Coastguard Worker STREAM_IDLE,
602*cc02d7e2SAndroid Build Coastguard Worker WAIT,
603*cc02d7e2SAndroid Build Coastguard Worker READY_TO_WRITE,
604*cc02d7e2SAndroid Build Coastguard Worker WRITE_DONE,
605*cc02d7e2SAndroid Build Coastguard Worker };
606*cc02d7e2SAndroid Build Coastguard Worker State next_state_;
607*cc02d7e2SAndroid Build Coastguard Worker std::function<void(grpc::Status, ResponseType*)> callback_;
608*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue_;
609*cc02d7e2SAndroid Build Coastguard Worker std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
610*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
611*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue*)>
612*cc02d7e2SAndroid Build Coastguard Worker prepare_req_;
613*cc02d7e2SAndroid Build Coastguard Worker grpc::Status status_;
614*cc02d7e2SAndroid Build Coastguard Worker double start_;
615*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_;
616*cc02d7e2SAndroid Build Coastguard Worker
StartInternal(CompletionQueue * cq)617*cc02d7e2SAndroid Build Coastguard Worker void StartInternal(CompletionQueue* cq) {
618*cc02d7e2SAndroid Build Coastguard Worker cq_ = cq;
619*cc02d7e2SAndroid Build Coastguard Worker stream_ = prepare_req_(stub_, &context_, &response_, cq);
620*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::STREAM_IDLE;
621*cc02d7e2SAndroid Build Coastguard Worker stream_->StartCall(ClientRpcContext::tag(this));
622*cc02d7e2SAndroid Build Coastguard Worker }
623*cc02d7e2SAndroid Build Coastguard Worker };
624*cc02d7e2SAndroid Build Coastguard Worker
625*cc02d7e2SAndroid Build Coastguard Worker class AsyncStreamingFromClientClient final
626*cc02d7e2SAndroid Build Coastguard Worker : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
627*cc02d7e2SAndroid Build Coastguard Worker public:
AsyncStreamingFromClientClient(const ClientConfig & config)628*cc02d7e2SAndroid Build Coastguard Worker explicit AsyncStreamingFromClientClient(const ClientConfig& config)
629*cc02d7e2SAndroid Build Coastguard Worker : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
630*cc02d7e2SAndroid Build Coastguard Worker config, SetupCtx, BenchmarkStubCreator) {
631*cc02d7e2SAndroid Build Coastguard Worker StartThreads(num_async_threads_);
632*cc02d7e2SAndroid Build Coastguard Worker }
633*cc02d7e2SAndroid Build Coastguard Worker
~AsyncStreamingFromClientClient()634*cc02d7e2SAndroid Build Coastguard Worker ~AsyncStreamingFromClientClient() override {}
635*cc02d7e2SAndroid Build Coastguard Worker
636*cc02d7e2SAndroid Build Coastguard Worker private:
CheckDone(const grpc::Status &,SimpleResponse *)637*cc02d7e2SAndroid Build Coastguard Worker static void CheckDone(const grpc::Status& /*s*/,
638*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse* /*response*/) {}
PrepareReq(BenchmarkService::Stub * stub,grpc::ClientContext * ctx,SimpleResponse * resp,CompletionQueue * cq)639*cc02d7e2SAndroid Build Coastguard Worker static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> PrepareReq(
640*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
641*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse* resp, CompletionQueue* cq) {
642*cc02d7e2SAndroid Build Coastguard Worker auto stream = stub->PrepareAsyncStreamingFromClient(ctx, resp, cq);
643*cc02d7e2SAndroid Build Coastguard Worker return stream;
644*cc02d7e2SAndroid Build Coastguard Worker };
SetupCtx(BenchmarkService::Stub * stub,std::function<gpr_timespec ()> next_issue,const SimpleRequest & req)645*cc02d7e2SAndroid Build Coastguard Worker static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
646*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue,
647*cc02d7e2SAndroid Build Coastguard Worker const SimpleRequest& req) {
648*cc02d7e2SAndroid Build Coastguard Worker return new ClientRpcContextStreamingFromClientImpl<SimpleRequest,
649*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse>(
650*cc02d7e2SAndroid Build Coastguard Worker stub, req, std::move(next_issue),
651*cc02d7e2SAndroid Build Coastguard Worker AsyncStreamingFromClientClient::PrepareReq,
652*cc02d7e2SAndroid Build Coastguard Worker AsyncStreamingFromClientClient::CheckDone);
653*cc02d7e2SAndroid Build Coastguard Worker }
654*cc02d7e2SAndroid Build Coastguard Worker };
655*cc02d7e2SAndroid Build Coastguard Worker
656*cc02d7e2SAndroid Build Coastguard Worker template <class RequestType, class ResponseType>
657*cc02d7e2SAndroid Build Coastguard Worker class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
658*cc02d7e2SAndroid Build Coastguard Worker public:
ClientRpcContextStreamingFromServerImpl(BenchmarkService::Stub * stub,const RequestType & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> (BenchmarkService::Stub *,grpc::ClientContext *,const RequestType &,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ResponseType *)> on_done)659*cc02d7e2SAndroid Build Coastguard Worker ClientRpcContextStreamingFromServerImpl(
660*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub* stub, const RequestType& req,
661*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue,
662*cc02d7e2SAndroid Build Coastguard Worker std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
663*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
664*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue*)>
665*cc02d7e2SAndroid Build Coastguard Worker prepare_req,
666*cc02d7e2SAndroid Build Coastguard Worker std::function<void(grpc::Status, ResponseType*)> on_done)
667*cc02d7e2SAndroid Build Coastguard Worker : context_(),
668*cc02d7e2SAndroid Build Coastguard Worker stub_(stub),
669*cc02d7e2SAndroid Build Coastguard Worker cq_(nullptr),
670*cc02d7e2SAndroid Build Coastguard Worker req_(req),
671*cc02d7e2SAndroid Build Coastguard Worker response_(),
672*cc02d7e2SAndroid Build Coastguard Worker next_state_(State::INVALID),
673*cc02d7e2SAndroid Build Coastguard Worker callback_(on_done),
674*cc02d7e2SAndroid Build Coastguard Worker next_issue_(std::move(next_issue)),
675*cc02d7e2SAndroid Build Coastguard Worker prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromServerImpl()676*cc02d7e2SAndroid Build Coastguard Worker ~ClientRpcContextStreamingFromServerImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)677*cc02d7e2SAndroid Build Coastguard Worker void Start(CompletionQueue* cq, const ClientConfig& config) override {
678*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(!config.use_coalesce_api()); // not supported
679*cc02d7e2SAndroid Build Coastguard Worker StartInternal(cq);
680*cc02d7e2SAndroid Build Coastguard Worker }
RunNextState(bool ok,HistogramEntry * entry)681*cc02d7e2SAndroid Build Coastguard Worker bool RunNextState(bool ok, HistogramEntry* entry) override {
682*cc02d7e2SAndroid Build Coastguard Worker while (true) {
683*cc02d7e2SAndroid Build Coastguard Worker switch (next_state_) {
684*cc02d7e2SAndroid Build Coastguard Worker case State::STREAM_IDLE:
685*cc02d7e2SAndroid Build Coastguard Worker if (!ok) {
686*cc02d7e2SAndroid Build Coastguard Worker return false;
687*cc02d7e2SAndroid Build Coastguard Worker }
688*cc02d7e2SAndroid Build Coastguard Worker start_ = UsageTimer::Now();
689*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::READ_DONE;
690*cc02d7e2SAndroid Build Coastguard Worker stream_->Read(&response_, ClientRpcContext::tag(this));
691*cc02d7e2SAndroid Build Coastguard Worker return true;
692*cc02d7e2SAndroid Build Coastguard Worker case State::READ_DONE:
693*cc02d7e2SAndroid Build Coastguard Worker if (!ok) {
694*cc02d7e2SAndroid Build Coastguard Worker return false;
695*cc02d7e2SAndroid Build Coastguard Worker }
696*cc02d7e2SAndroid Build Coastguard Worker entry->set_value((UsageTimer::Now() - start_) * 1e9);
697*cc02d7e2SAndroid Build Coastguard Worker callback_(status_, &response_);
698*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::STREAM_IDLE;
699*cc02d7e2SAndroid Build Coastguard Worker break; // loop around
700*cc02d7e2SAndroid Build Coastguard Worker default:
701*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash("unreachable");
702*cc02d7e2SAndroid Build Coastguard Worker return false;
703*cc02d7e2SAndroid Build Coastguard Worker }
704*cc02d7e2SAndroid Build Coastguard Worker }
705*cc02d7e2SAndroid Build Coastguard Worker }
StartNewClone(CompletionQueue * cq)706*cc02d7e2SAndroid Build Coastguard Worker void StartNewClone(CompletionQueue* cq) override {
707*cc02d7e2SAndroid Build Coastguard Worker auto* clone = new ClientRpcContextStreamingFromServerImpl(
708*cc02d7e2SAndroid Build Coastguard Worker stub_, req_, next_issue_, prepare_req_, callback_);
709*cc02d7e2SAndroid Build Coastguard Worker clone->StartInternal(cq);
710*cc02d7e2SAndroid Build Coastguard Worker }
TryCancel()711*cc02d7e2SAndroid Build Coastguard Worker void TryCancel() override { context_.TryCancel(); }
712*cc02d7e2SAndroid Build Coastguard Worker
713*cc02d7e2SAndroid Build Coastguard Worker private:
714*cc02d7e2SAndroid Build Coastguard Worker grpc::ClientContext context_;
715*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub* stub_;
716*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue* cq_;
717*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Alarm> alarm_;
718*cc02d7e2SAndroid Build Coastguard Worker const RequestType& req_;
719*cc02d7e2SAndroid Build Coastguard Worker ResponseType response_;
720*cc02d7e2SAndroid Build Coastguard Worker enum State { INVALID, STREAM_IDLE, READ_DONE };
721*cc02d7e2SAndroid Build Coastguard Worker State next_state_;
722*cc02d7e2SAndroid Build Coastguard Worker std::function<void(grpc::Status, ResponseType*)> callback_;
723*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue_;
724*cc02d7e2SAndroid Build Coastguard Worker std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
725*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
726*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue*)>
727*cc02d7e2SAndroid Build Coastguard Worker prepare_req_;
728*cc02d7e2SAndroid Build Coastguard Worker grpc::Status status_;
729*cc02d7e2SAndroid Build Coastguard Worker double start_;
730*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_;
731*cc02d7e2SAndroid Build Coastguard Worker
StartInternal(CompletionQueue * cq)732*cc02d7e2SAndroid Build Coastguard Worker void StartInternal(CompletionQueue* cq) {
733*cc02d7e2SAndroid Build Coastguard Worker // TODO(vjpai): Add support to rate-pace this
734*cc02d7e2SAndroid Build Coastguard Worker cq_ = cq;
735*cc02d7e2SAndroid Build Coastguard Worker stream_ = prepare_req_(stub_, &context_, req_, cq);
736*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::STREAM_IDLE;
737*cc02d7e2SAndroid Build Coastguard Worker stream_->StartCall(ClientRpcContext::tag(this));
738*cc02d7e2SAndroid Build Coastguard Worker }
739*cc02d7e2SAndroid Build Coastguard Worker };
740*cc02d7e2SAndroid Build Coastguard Worker
741*cc02d7e2SAndroid Build Coastguard Worker class AsyncStreamingFromServerClient final
742*cc02d7e2SAndroid Build Coastguard Worker : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
743*cc02d7e2SAndroid Build Coastguard Worker public:
AsyncStreamingFromServerClient(const ClientConfig & config)744*cc02d7e2SAndroid Build Coastguard Worker explicit AsyncStreamingFromServerClient(const ClientConfig& config)
745*cc02d7e2SAndroid Build Coastguard Worker : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
746*cc02d7e2SAndroid Build Coastguard Worker config, SetupCtx, BenchmarkStubCreator) {
747*cc02d7e2SAndroid Build Coastguard Worker StartThreads(num_async_threads_);
748*cc02d7e2SAndroid Build Coastguard Worker }
749*cc02d7e2SAndroid Build Coastguard Worker
~AsyncStreamingFromServerClient()750*cc02d7e2SAndroid Build Coastguard Worker ~AsyncStreamingFromServerClient() override {}
751*cc02d7e2SAndroid Build Coastguard Worker
752*cc02d7e2SAndroid Build Coastguard Worker private:
CheckDone(const grpc::Status &,SimpleResponse *)753*cc02d7e2SAndroid Build Coastguard Worker static void CheckDone(const grpc::Status& /*s*/,
754*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse* /*response*/) {}
PrepareReq(BenchmarkService::Stub * stub,grpc::ClientContext * ctx,const SimpleRequest & req,CompletionQueue * cq)755*cc02d7e2SAndroid Build Coastguard Worker static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> PrepareReq(
756*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
757*cc02d7e2SAndroid Build Coastguard Worker const SimpleRequest& req, CompletionQueue* cq) {
758*cc02d7e2SAndroid Build Coastguard Worker auto stream = stub->PrepareAsyncStreamingFromServer(ctx, req, cq);
759*cc02d7e2SAndroid Build Coastguard Worker return stream;
760*cc02d7e2SAndroid Build Coastguard Worker };
SetupCtx(BenchmarkService::Stub * stub,std::function<gpr_timespec ()> next_issue,const SimpleRequest & req)761*cc02d7e2SAndroid Build Coastguard Worker static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
762*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue,
763*cc02d7e2SAndroid Build Coastguard Worker const SimpleRequest& req) {
764*cc02d7e2SAndroid Build Coastguard Worker return new ClientRpcContextStreamingFromServerImpl<SimpleRequest,
765*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse>(
766*cc02d7e2SAndroid Build Coastguard Worker stub, req, std::move(next_issue),
767*cc02d7e2SAndroid Build Coastguard Worker AsyncStreamingFromServerClient::PrepareReq,
768*cc02d7e2SAndroid Build Coastguard Worker AsyncStreamingFromServerClient::CheckDone);
769*cc02d7e2SAndroid Build Coastguard Worker }
770*cc02d7e2SAndroid Build Coastguard Worker };
771*cc02d7e2SAndroid Build Coastguard Worker
772*cc02d7e2SAndroid Build Coastguard Worker class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
773*cc02d7e2SAndroid Build Coastguard Worker public:
ClientRpcContextGenericStreamingImpl(grpc::GenericStub * stub,const ByteBuffer & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter> (grpc::GenericStub *,grpc::ClientContext *,const std::string & method_name,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ByteBuffer *)> on_done)774*cc02d7e2SAndroid Build Coastguard Worker ClientRpcContextGenericStreamingImpl(
775*cc02d7e2SAndroid Build Coastguard Worker grpc::GenericStub* stub, const ByteBuffer& req,
776*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue,
777*cc02d7e2SAndroid Build Coastguard Worker std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
778*cc02d7e2SAndroid Build Coastguard Worker grpc::GenericStub*, grpc::ClientContext*,
779*cc02d7e2SAndroid Build Coastguard Worker const std::string& method_name, CompletionQueue*)>
780*cc02d7e2SAndroid Build Coastguard Worker prepare_req,
781*cc02d7e2SAndroid Build Coastguard Worker std::function<void(grpc::Status, ByteBuffer*)> on_done)
782*cc02d7e2SAndroid Build Coastguard Worker : context_(),
783*cc02d7e2SAndroid Build Coastguard Worker stub_(stub),
784*cc02d7e2SAndroid Build Coastguard Worker cq_(nullptr),
785*cc02d7e2SAndroid Build Coastguard Worker req_(req),
786*cc02d7e2SAndroid Build Coastguard Worker response_(),
787*cc02d7e2SAndroid Build Coastguard Worker next_state_(State::INVALID),
788*cc02d7e2SAndroid Build Coastguard Worker callback_(std::move(on_done)),
789*cc02d7e2SAndroid Build Coastguard Worker next_issue_(std::move(next_issue)),
790*cc02d7e2SAndroid Build Coastguard Worker prepare_req_(std::move(prepare_req)) {}
~ClientRpcContextGenericStreamingImpl()791*cc02d7e2SAndroid Build Coastguard Worker ~ClientRpcContextGenericStreamingImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)792*cc02d7e2SAndroid Build Coastguard Worker void Start(CompletionQueue* cq, const ClientConfig& config) override {
793*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
794*cc02d7e2SAndroid Build Coastguard Worker StartInternal(cq, config.messages_per_stream());
795*cc02d7e2SAndroid Build Coastguard Worker }
RunNextState(bool ok,HistogramEntry * entry)796*cc02d7e2SAndroid Build Coastguard Worker bool RunNextState(bool ok, HistogramEntry* entry) override {
797*cc02d7e2SAndroid Build Coastguard Worker while (true) {
798*cc02d7e2SAndroid Build Coastguard Worker switch (next_state_) {
799*cc02d7e2SAndroid Build Coastguard Worker case State::STREAM_IDLE:
800*cc02d7e2SAndroid Build Coastguard Worker if (!next_issue_) { // ready to issue
801*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::READY_TO_WRITE;
802*cc02d7e2SAndroid Build Coastguard Worker } else {
803*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::WAIT;
804*cc02d7e2SAndroid Build Coastguard Worker }
805*cc02d7e2SAndroid Build Coastguard Worker break; // loop around, don't return
806*cc02d7e2SAndroid Build Coastguard Worker case State::WAIT:
807*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::READY_TO_WRITE;
808*cc02d7e2SAndroid Build Coastguard Worker alarm_ = std::make_unique<Alarm>();
809*cc02d7e2SAndroid Build Coastguard Worker alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
810*cc02d7e2SAndroid Build Coastguard Worker return true;
811*cc02d7e2SAndroid Build Coastguard Worker case State::READY_TO_WRITE:
812*cc02d7e2SAndroid Build Coastguard Worker if (!ok) {
813*cc02d7e2SAndroid Build Coastguard Worker return false;
814*cc02d7e2SAndroid Build Coastguard Worker }
815*cc02d7e2SAndroid Build Coastguard Worker start_ = UsageTimer::Now();
816*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::WRITE_DONE;
817*cc02d7e2SAndroid Build Coastguard Worker stream_->Write(req_, ClientRpcContext::tag(this));
818*cc02d7e2SAndroid Build Coastguard Worker return true;
819*cc02d7e2SAndroid Build Coastguard Worker case State::WRITE_DONE:
820*cc02d7e2SAndroid Build Coastguard Worker if (!ok) {
821*cc02d7e2SAndroid Build Coastguard Worker return false;
822*cc02d7e2SAndroid Build Coastguard Worker }
823*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::READ_DONE;
824*cc02d7e2SAndroid Build Coastguard Worker stream_->Read(&response_, ClientRpcContext::tag(this));
825*cc02d7e2SAndroid Build Coastguard Worker return true;
826*cc02d7e2SAndroid Build Coastguard Worker case State::READ_DONE:
827*cc02d7e2SAndroid Build Coastguard Worker entry->set_value((UsageTimer::Now() - start_) * 1e9);
828*cc02d7e2SAndroid Build Coastguard Worker callback_(status_, &response_);
829*cc02d7e2SAndroid Build Coastguard Worker if ((messages_per_stream_ != 0) &&
830*cc02d7e2SAndroid Build Coastguard Worker (++messages_issued_ >= messages_per_stream_)) {
831*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::WRITES_DONE_DONE;
832*cc02d7e2SAndroid Build Coastguard Worker stream_->WritesDone(ClientRpcContext::tag(this));
833*cc02d7e2SAndroid Build Coastguard Worker return true;
834*cc02d7e2SAndroid Build Coastguard Worker }
835*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::STREAM_IDLE;
836*cc02d7e2SAndroid Build Coastguard Worker break; // loop around
837*cc02d7e2SAndroid Build Coastguard Worker case State::WRITES_DONE_DONE:
838*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::FINISH_DONE;
839*cc02d7e2SAndroid Build Coastguard Worker stream_->Finish(&status_, ClientRpcContext::tag(this));
840*cc02d7e2SAndroid Build Coastguard Worker return true;
841*cc02d7e2SAndroid Build Coastguard Worker case State::FINISH_DONE:
842*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::INVALID;
843*cc02d7e2SAndroid Build Coastguard Worker return false;
844*cc02d7e2SAndroid Build Coastguard Worker default:
845*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash("unreachable");
846*cc02d7e2SAndroid Build Coastguard Worker return false;
847*cc02d7e2SAndroid Build Coastguard Worker }
848*cc02d7e2SAndroid Build Coastguard Worker }
849*cc02d7e2SAndroid Build Coastguard Worker }
StartNewClone(CompletionQueue * cq)850*cc02d7e2SAndroid Build Coastguard Worker void StartNewClone(CompletionQueue* cq) override {
851*cc02d7e2SAndroid Build Coastguard Worker auto* clone = new ClientRpcContextGenericStreamingImpl(
852*cc02d7e2SAndroid Build Coastguard Worker stub_, req_, next_issue_, prepare_req_, callback_);
853*cc02d7e2SAndroid Build Coastguard Worker clone->StartInternal(cq, messages_per_stream_);
854*cc02d7e2SAndroid Build Coastguard Worker }
TryCancel()855*cc02d7e2SAndroid Build Coastguard Worker void TryCancel() override { context_.TryCancel(); }
856*cc02d7e2SAndroid Build Coastguard Worker
857*cc02d7e2SAndroid Build Coastguard Worker private:
858*cc02d7e2SAndroid Build Coastguard Worker grpc::ClientContext context_;
859*cc02d7e2SAndroid Build Coastguard Worker grpc::GenericStub* stub_;
860*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue* cq_;
861*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Alarm> alarm_;
862*cc02d7e2SAndroid Build Coastguard Worker ByteBuffer req_;
863*cc02d7e2SAndroid Build Coastguard Worker ByteBuffer response_;
864*cc02d7e2SAndroid Build Coastguard Worker enum State {
865*cc02d7e2SAndroid Build Coastguard Worker INVALID,
866*cc02d7e2SAndroid Build Coastguard Worker STREAM_IDLE,
867*cc02d7e2SAndroid Build Coastguard Worker WAIT,
868*cc02d7e2SAndroid Build Coastguard Worker READY_TO_WRITE,
869*cc02d7e2SAndroid Build Coastguard Worker WRITE_DONE,
870*cc02d7e2SAndroid Build Coastguard Worker READ_DONE,
871*cc02d7e2SAndroid Build Coastguard Worker WRITES_DONE_DONE,
872*cc02d7e2SAndroid Build Coastguard Worker FINISH_DONE
873*cc02d7e2SAndroid Build Coastguard Worker };
874*cc02d7e2SAndroid Build Coastguard Worker State next_state_;
875*cc02d7e2SAndroid Build Coastguard Worker std::function<void(grpc::Status, ByteBuffer*)> callback_;
876*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue_;
877*cc02d7e2SAndroid Build Coastguard Worker std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
878*cc02d7e2SAndroid Build Coastguard Worker grpc::GenericStub*, grpc::ClientContext*, const std::string&,
879*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue*)>
880*cc02d7e2SAndroid Build Coastguard Worker prepare_req_;
881*cc02d7e2SAndroid Build Coastguard Worker grpc::Status status_;
882*cc02d7e2SAndroid Build Coastguard Worker double start_;
883*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_;
884*cc02d7e2SAndroid Build Coastguard Worker
885*cc02d7e2SAndroid Build Coastguard Worker // Allow a limit on number of messages in a stream
886*cc02d7e2SAndroid Build Coastguard Worker int messages_per_stream_;
887*cc02d7e2SAndroid Build Coastguard Worker int messages_issued_;
888*cc02d7e2SAndroid Build Coastguard Worker
StartInternal(CompletionQueue * cq,int messages_per_stream)889*cc02d7e2SAndroid Build Coastguard Worker void StartInternal(CompletionQueue* cq, int messages_per_stream) {
890*cc02d7e2SAndroid Build Coastguard Worker cq_ = cq;
891*cc02d7e2SAndroid Build Coastguard Worker const std::string kMethodName(
892*cc02d7e2SAndroid Build Coastguard Worker "/grpc.testing.BenchmarkService/StreamingCall");
893*cc02d7e2SAndroid Build Coastguard Worker messages_per_stream_ = messages_per_stream;
894*cc02d7e2SAndroid Build Coastguard Worker messages_issued_ = 0;
895*cc02d7e2SAndroid Build Coastguard Worker stream_ = prepare_req_(stub_, &context_, kMethodName, cq);
896*cc02d7e2SAndroid Build Coastguard Worker next_state_ = State::STREAM_IDLE;
897*cc02d7e2SAndroid Build Coastguard Worker stream_->StartCall(ClientRpcContext::tag(this));
898*cc02d7e2SAndroid Build Coastguard Worker }
899*cc02d7e2SAndroid Build Coastguard Worker };
900*cc02d7e2SAndroid Build Coastguard Worker
GenericStubCreator(const std::shared_ptr<Channel> & ch)901*cc02d7e2SAndroid Build Coastguard Worker static std::unique_ptr<grpc::GenericStub> GenericStubCreator(
902*cc02d7e2SAndroid Build Coastguard Worker const std::shared_ptr<Channel>& ch) {
903*cc02d7e2SAndroid Build Coastguard Worker return std::make_unique<grpc::GenericStub>(ch);
904*cc02d7e2SAndroid Build Coastguard Worker }
905*cc02d7e2SAndroid Build Coastguard Worker
906*cc02d7e2SAndroid Build Coastguard Worker class GenericAsyncStreamingClient final
907*cc02d7e2SAndroid Build Coastguard Worker : public AsyncClient<grpc::GenericStub, ByteBuffer> {
908*cc02d7e2SAndroid Build Coastguard Worker public:
GenericAsyncStreamingClient(const ClientConfig & config)909*cc02d7e2SAndroid Build Coastguard Worker explicit GenericAsyncStreamingClient(const ClientConfig& config)
910*cc02d7e2SAndroid Build Coastguard Worker : AsyncClient<grpc::GenericStub, ByteBuffer>(config, SetupCtx,
911*cc02d7e2SAndroid Build Coastguard Worker GenericStubCreator) {
912*cc02d7e2SAndroid Build Coastguard Worker StartThreads(num_async_threads_);
913*cc02d7e2SAndroid Build Coastguard Worker }
914*cc02d7e2SAndroid Build Coastguard Worker
~GenericAsyncStreamingClient()915*cc02d7e2SAndroid Build Coastguard Worker ~GenericAsyncStreamingClient() override {}
916*cc02d7e2SAndroid Build Coastguard Worker
917*cc02d7e2SAndroid Build Coastguard Worker private:
CheckDone(const grpc::Status &,ByteBuffer *)918*cc02d7e2SAndroid Build Coastguard Worker static void CheckDone(const grpc::Status& /*s*/, ByteBuffer* /*response*/) {}
PrepareReq(grpc::GenericStub * stub,grpc::ClientContext * ctx,const std::string & method_name,CompletionQueue * cq)919*cc02d7e2SAndroid Build Coastguard Worker static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> PrepareReq(
920*cc02d7e2SAndroid Build Coastguard Worker grpc::GenericStub* stub, grpc::ClientContext* ctx,
921*cc02d7e2SAndroid Build Coastguard Worker const std::string& method_name, CompletionQueue* cq) {
922*cc02d7e2SAndroid Build Coastguard Worker auto stream = stub->PrepareCall(ctx, method_name, cq);
923*cc02d7e2SAndroid Build Coastguard Worker return stream;
924*cc02d7e2SAndroid Build Coastguard Worker };
SetupCtx(grpc::GenericStub * stub,std::function<gpr_timespec ()> next_issue,const ByteBuffer & req)925*cc02d7e2SAndroid Build Coastguard Worker static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
926*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> next_issue,
927*cc02d7e2SAndroid Build Coastguard Worker const ByteBuffer& req) {
928*cc02d7e2SAndroid Build Coastguard Worker return new ClientRpcContextGenericStreamingImpl(
929*cc02d7e2SAndroid Build Coastguard Worker stub, req, std::move(next_issue),
930*cc02d7e2SAndroid Build Coastguard Worker GenericAsyncStreamingClient::PrepareReq,
931*cc02d7e2SAndroid Build Coastguard Worker GenericAsyncStreamingClient::CheckDone);
932*cc02d7e2SAndroid Build Coastguard Worker }
933*cc02d7e2SAndroid Build Coastguard Worker };
934*cc02d7e2SAndroid Build Coastguard Worker
CreateAsyncClient(const ClientConfig & config)935*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config) {
936*cc02d7e2SAndroid Build Coastguard Worker switch (config.rpc_type()) {
937*cc02d7e2SAndroid Build Coastguard Worker case UNARY:
938*cc02d7e2SAndroid Build Coastguard Worker return std::unique_ptr<Client>(new AsyncUnaryClient(config));
939*cc02d7e2SAndroid Build Coastguard Worker case STREAMING:
940*cc02d7e2SAndroid Build Coastguard Worker return std::unique_ptr<Client>(new AsyncStreamingPingPongClient(config));
941*cc02d7e2SAndroid Build Coastguard Worker case STREAMING_FROM_CLIENT:
942*cc02d7e2SAndroid Build Coastguard Worker return std::unique_ptr<Client>(
943*cc02d7e2SAndroid Build Coastguard Worker new AsyncStreamingFromClientClient(config));
944*cc02d7e2SAndroid Build Coastguard Worker case STREAMING_FROM_SERVER:
945*cc02d7e2SAndroid Build Coastguard Worker return std::unique_ptr<Client>(
946*cc02d7e2SAndroid Build Coastguard Worker new AsyncStreamingFromServerClient(config));
947*cc02d7e2SAndroid Build Coastguard Worker case STREAMING_BOTH_WAYS:
948*cc02d7e2SAndroid Build Coastguard Worker // TODO(vjpai): Implement this
949*cc02d7e2SAndroid Build Coastguard Worker assert(false);
950*cc02d7e2SAndroid Build Coastguard Worker return nullptr;
951*cc02d7e2SAndroid Build Coastguard Worker default:
952*cc02d7e2SAndroid Build Coastguard Worker assert(false);
953*cc02d7e2SAndroid Build Coastguard Worker return nullptr;
954*cc02d7e2SAndroid Build Coastguard Worker }
955*cc02d7e2SAndroid Build Coastguard Worker }
CreateGenericAsyncStreamingClient(const ClientConfig & config)956*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
957*cc02d7e2SAndroid Build Coastguard Worker const ClientConfig& config) {
958*cc02d7e2SAndroid Build Coastguard Worker return std::unique_ptr<Client>(new GenericAsyncStreamingClient(config));
959*cc02d7e2SAndroid Build Coastguard Worker }
960*cc02d7e2SAndroid Build Coastguard Worker
961*cc02d7e2SAndroid Build Coastguard Worker } // namespace testing
962*cc02d7e2SAndroid Build Coastguard Worker } // namespace grpc
963