xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/cfstream_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker //
3*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2019 The gRPC Authors
4*cc02d7e2SAndroid Build Coastguard Worker //
5*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker //
9*cc02d7e2SAndroid Build Coastguard Worker //     http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker //
11*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker //
18*cc02d7e2SAndroid Build Coastguard Worker 
19*cc02d7e2SAndroid Build Coastguard Worker #include <algorithm>
20*cc02d7e2SAndroid Build Coastguard Worker #include <memory>
21*cc02d7e2SAndroid Build Coastguard Worker #include <mutex>
22*cc02d7e2SAndroid Build Coastguard Worker #include <random>
23*cc02d7e2SAndroid Build Coastguard Worker #include <thread>
24*cc02d7e2SAndroid Build Coastguard Worker 
25*cc02d7e2SAndroid Build Coastguard Worker #include <gtest/gtest.h>
26*cc02d7e2SAndroid Build Coastguard Worker 
27*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/grpc.h>
28*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/alloc.h>
29*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/atm.h>
30*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/log.h>
31*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/string_util.h>
32*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/time.h>
33*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/channel.h>
34*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/client_context.h>
35*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/create_channel.h>
36*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/health_check_service_interface.h>
37*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server.h>
38*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server_builder.h>
39*cc02d7e2SAndroid Build Coastguard Worker 
40*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/backoff/backoff.h"
41*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/crash.h"
42*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/env.h"
43*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/iomgr/port.h"
44*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/echo.grpc.pb.h"
45*cc02d7e2SAndroid Build Coastguard Worker #include "test/core/util/port.h"
46*cc02d7e2SAndroid Build Coastguard Worker #include "test/core/util/test_config.h"
47*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/end2end/test_service_impl.h"
48*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/util/test_credentials_provider.h"
49*cc02d7e2SAndroid Build Coastguard Worker 
50*cc02d7e2SAndroid Build Coastguard Worker #ifdef GRPC_CFSTREAM
51*cc02d7e2SAndroid Build Coastguard Worker using grpc::ClientAsyncResponseReader;
52*cc02d7e2SAndroid Build Coastguard Worker using grpc::testing::EchoRequest;
53*cc02d7e2SAndroid Build Coastguard Worker using grpc::testing::EchoResponse;
54*cc02d7e2SAndroid Build Coastguard Worker using grpc::testing::RequestParams;
55*cc02d7e2SAndroid Build Coastguard Worker using std::chrono::system_clock;
56*cc02d7e2SAndroid Build Coastguard Worker 
57*cc02d7e2SAndroid Build Coastguard Worker namespace grpc {
58*cc02d7e2SAndroid Build Coastguard Worker namespace testing {
59*cc02d7e2SAndroid Build Coastguard Worker namespace {
60*cc02d7e2SAndroid Build Coastguard Worker 
61*cc02d7e2SAndroid Build Coastguard Worker struct TestScenario {
TestScenariogrpc::testing::__anon526209530111::TestScenario62*cc02d7e2SAndroid Build Coastguard Worker   TestScenario(const std::string& creds_type, const std::string& content)
63*cc02d7e2SAndroid Build Coastguard Worker       : credentials_type(creds_type), message_content(content) {}
64*cc02d7e2SAndroid Build Coastguard Worker   const std::string credentials_type;
65*cc02d7e2SAndroid Build Coastguard Worker   const std::string message_content;
66*cc02d7e2SAndroid Build Coastguard Worker };
67*cc02d7e2SAndroid Build Coastguard Worker 
68*cc02d7e2SAndroid Build Coastguard Worker class CFStreamTest : public ::testing::TestWithParam<TestScenario> {
69*cc02d7e2SAndroid Build Coastguard Worker  protected:
CFStreamTest()70*cc02d7e2SAndroid Build Coastguard Worker   CFStreamTest()
71*cc02d7e2SAndroid Build Coastguard Worker       : server_host_("grpctest"),
72*cc02d7e2SAndroid Build Coastguard Worker         interface_("lo0"),
73*cc02d7e2SAndroid Build Coastguard Worker         ipv4_address_("10.0.0.1") {}
74*cc02d7e2SAndroid Build Coastguard Worker 
DNSUp()75*cc02d7e2SAndroid Build Coastguard Worker   void DNSUp() {
76*cc02d7e2SAndroid Build Coastguard Worker     std::ostringstream cmd;
77*cc02d7e2SAndroid Build Coastguard Worker     // Add DNS entry for server_host_ in /etc/hosts
78*cc02d7e2SAndroid Build Coastguard Worker     cmd << "echo '" << ipv4_address_ << "      " << server_host_
79*cc02d7e2SAndroid Build Coastguard Worker         << "  ' | sudo tee -a /etc/hosts";
80*cc02d7e2SAndroid Build Coastguard Worker     std::system(cmd.str().c_str());
81*cc02d7e2SAndroid Build Coastguard Worker   }
82*cc02d7e2SAndroid Build Coastguard Worker 
DNSDown()83*cc02d7e2SAndroid Build Coastguard Worker   void DNSDown() {
84*cc02d7e2SAndroid Build Coastguard Worker     std::ostringstream cmd;
85*cc02d7e2SAndroid Build Coastguard Worker     // Remove DNS entry for server_host_ in /etc/hosts
86*cc02d7e2SAndroid Build Coastguard Worker     cmd << "sudo sed -i '.bak' '/" << server_host_ << "/d' /etc/hosts";
87*cc02d7e2SAndroid Build Coastguard Worker     std::system(cmd.str().c_str());
88*cc02d7e2SAndroid Build Coastguard Worker   }
89*cc02d7e2SAndroid Build Coastguard Worker 
InterfaceUp()90*cc02d7e2SAndroid Build Coastguard Worker   void InterfaceUp() {
91*cc02d7e2SAndroid Build Coastguard Worker     std::ostringstream cmd;
92*cc02d7e2SAndroid Build Coastguard Worker     cmd << "sudo /sbin/ifconfig " << interface_ << " alias " << ipv4_address_;
93*cc02d7e2SAndroid Build Coastguard Worker     std::system(cmd.str().c_str());
94*cc02d7e2SAndroid Build Coastguard Worker   }
95*cc02d7e2SAndroid Build Coastguard Worker 
InterfaceDown()96*cc02d7e2SAndroid Build Coastguard Worker   void InterfaceDown() {
97*cc02d7e2SAndroid Build Coastguard Worker     std::ostringstream cmd;
98*cc02d7e2SAndroid Build Coastguard Worker     cmd << "sudo /sbin/ifconfig " << interface_ << " -alias " << ipv4_address_;
99*cc02d7e2SAndroid Build Coastguard Worker     std::system(cmd.str().c_str());
100*cc02d7e2SAndroid Build Coastguard Worker   }
101*cc02d7e2SAndroid Build Coastguard Worker 
NetworkUp()102*cc02d7e2SAndroid Build Coastguard Worker   void NetworkUp() {
103*cc02d7e2SAndroid Build Coastguard Worker     gpr_log(GPR_DEBUG, "Bringing network up");
104*cc02d7e2SAndroid Build Coastguard Worker     InterfaceUp();
105*cc02d7e2SAndroid Build Coastguard Worker     DNSUp();
106*cc02d7e2SAndroid Build Coastguard Worker   }
107*cc02d7e2SAndroid Build Coastguard Worker 
NetworkDown()108*cc02d7e2SAndroid Build Coastguard Worker   void NetworkDown() {
109*cc02d7e2SAndroid Build Coastguard Worker     gpr_log(GPR_DEBUG, "Bringing network down");
110*cc02d7e2SAndroid Build Coastguard Worker     InterfaceDown();
111*cc02d7e2SAndroid Build Coastguard Worker     DNSDown();
112*cc02d7e2SAndroid Build Coastguard Worker   }
113*cc02d7e2SAndroid Build Coastguard Worker 
SetUp()114*cc02d7e2SAndroid Build Coastguard Worker   void SetUp() override {
115*cc02d7e2SAndroid Build Coastguard Worker     NetworkUp();
116*cc02d7e2SAndroid Build Coastguard Worker     grpc_init();
117*cc02d7e2SAndroid Build Coastguard Worker     StartServer();
118*cc02d7e2SAndroid Build Coastguard Worker   }
119*cc02d7e2SAndroid Build Coastguard Worker 
TearDown()120*cc02d7e2SAndroid Build Coastguard Worker   void TearDown() override {
121*cc02d7e2SAndroid Build Coastguard Worker     NetworkDown();
122*cc02d7e2SAndroid Build Coastguard Worker     StopServer();
123*cc02d7e2SAndroid Build Coastguard Worker     grpc_shutdown();
124*cc02d7e2SAndroid Build Coastguard Worker   }
125*cc02d7e2SAndroid Build Coastguard Worker 
StartServer()126*cc02d7e2SAndroid Build Coastguard Worker   void StartServer() {
127*cc02d7e2SAndroid Build Coastguard Worker     port_ = grpc_pick_unused_port_or_die();
128*cc02d7e2SAndroid Build Coastguard Worker     server_.reset(new ServerData(port_, GetParam().credentials_type));
129*cc02d7e2SAndroid Build Coastguard Worker     server_->Start(server_host_);
130*cc02d7e2SAndroid Build Coastguard Worker   }
StopServer()131*cc02d7e2SAndroid Build Coastguard Worker   void StopServer() { server_->Shutdown(); }
132*cc02d7e2SAndroid Build Coastguard Worker 
BuildStub(const std::shared_ptr<Channel> & channel)133*cc02d7e2SAndroid Build Coastguard Worker   std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
134*cc02d7e2SAndroid Build Coastguard Worker       const std::shared_ptr<Channel>& channel) {
135*cc02d7e2SAndroid Build Coastguard Worker     return grpc::testing::EchoTestService::NewStub(channel);
136*cc02d7e2SAndroid Build Coastguard Worker   }
137*cc02d7e2SAndroid Build Coastguard Worker 
BuildChannel()138*cc02d7e2SAndroid Build Coastguard Worker   std::shared_ptr<Channel> BuildChannel() {
139*cc02d7e2SAndroid Build Coastguard Worker     std::ostringstream server_address;
140*cc02d7e2SAndroid Build Coastguard Worker     server_address << server_host_ << ":" << port_;
141*cc02d7e2SAndroid Build Coastguard Worker     ChannelArguments args;
142*cc02d7e2SAndroid Build Coastguard Worker     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
143*cc02d7e2SAndroid Build Coastguard Worker         GetParam().credentials_type, &args);
144*cc02d7e2SAndroid Build Coastguard Worker     return CreateCustomChannel(server_address.str(), channel_creds, args);
145*cc02d7e2SAndroid Build Coastguard Worker   }
146*cc02d7e2SAndroid Build Coastguard Worker 
SendRpc(const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,bool expect_success=false)147*cc02d7e2SAndroid Build Coastguard Worker   void SendRpc(
148*cc02d7e2SAndroid Build Coastguard Worker       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
149*cc02d7e2SAndroid Build Coastguard Worker       bool expect_success = false) {
150*cc02d7e2SAndroid Build Coastguard Worker     auto response = std::unique_ptr<EchoResponse>(new EchoResponse());
151*cc02d7e2SAndroid Build Coastguard Worker     EchoRequest request;
152*cc02d7e2SAndroid Build Coastguard Worker     auto& msg = GetParam().message_content;
153*cc02d7e2SAndroid Build Coastguard Worker     request.set_message(msg);
154*cc02d7e2SAndroid Build Coastguard Worker     ClientContext context;
155*cc02d7e2SAndroid Build Coastguard Worker     Status status = stub->Echo(&context, request, response.get());
156*cc02d7e2SAndroid Build Coastguard Worker     if (status.ok()) {
157*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_DEBUG, "RPC with succeeded");
158*cc02d7e2SAndroid Build Coastguard Worker       EXPECT_EQ(msg, response->message());
159*cc02d7e2SAndroid Build Coastguard Worker     } else {
160*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_DEBUG, "RPC failed: %s", status.error_message().c_str());
161*cc02d7e2SAndroid Build Coastguard Worker     }
162*cc02d7e2SAndroid Build Coastguard Worker     if (expect_success) {
163*cc02d7e2SAndroid Build Coastguard Worker       EXPECT_TRUE(status.ok());
164*cc02d7e2SAndroid Build Coastguard Worker     }
165*cc02d7e2SAndroid Build Coastguard Worker   }
SendAsyncRpc(const std::unique_ptr<grpc::testing::EchoTestService::Stub> & stub,RequestParams param=RequestParams ())166*cc02d7e2SAndroid Build Coastguard Worker   void SendAsyncRpc(
167*cc02d7e2SAndroid Build Coastguard Worker       const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
168*cc02d7e2SAndroid Build Coastguard Worker       RequestParams param = RequestParams()) {
169*cc02d7e2SAndroid Build Coastguard Worker     EchoRequest request;
170*cc02d7e2SAndroid Build Coastguard Worker     request.set_message(GetParam().message_content);
171*cc02d7e2SAndroid Build Coastguard Worker     *request.mutable_param() = std::move(param);
172*cc02d7e2SAndroid Build Coastguard Worker     AsyncClientCall* call = new AsyncClientCall;
173*cc02d7e2SAndroid Build Coastguard Worker 
174*cc02d7e2SAndroid Build Coastguard Worker     call->response_reader =
175*cc02d7e2SAndroid Build Coastguard Worker         stub->PrepareAsyncEcho(&call->context, request, &cq_);
176*cc02d7e2SAndroid Build Coastguard Worker 
177*cc02d7e2SAndroid Build Coastguard Worker     call->response_reader->StartCall();
178*cc02d7e2SAndroid Build Coastguard Worker     call->response_reader->Finish(&call->reply, &call->status, (void*)call);
179*cc02d7e2SAndroid Build Coastguard Worker   }
180*cc02d7e2SAndroid Build Coastguard Worker 
ShutdownCQ()181*cc02d7e2SAndroid Build Coastguard Worker   void ShutdownCQ() { cq_.Shutdown(); }
182*cc02d7e2SAndroid Build Coastguard Worker 
CQNext(void ** tag,bool * ok)183*cc02d7e2SAndroid Build Coastguard Worker   bool CQNext(void** tag, bool* ok) {
184*cc02d7e2SAndroid Build Coastguard Worker     auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(10);
185*cc02d7e2SAndroid Build Coastguard Worker     auto ret = cq_.AsyncNext(tag, ok, deadline);
186*cc02d7e2SAndroid Build Coastguard Worker     if (ret == grpc::CompletionQueue::GOT_EVENT) {
187*cc02d7e2SAndroid Build Coastguard Worker       return true;
188*cc02d7e2SAndroid Build Coastguard Worker     } else if (ret == grpc::CompletionQueue::SHUTDOWN) {
189*cc02d7e2SAndroid Build Coastguard Worker       return false;
190*cc02d7e2SAndroid Build Coastguard Worker     } else {
191*cc02d7e2SAndroid Build Coastguard Worker       GPR_ASSERT(ret == grpc::CompletionQueue::TIMEOUT);
192*cc02d7e2SAndroid Build Coastguard Worker       // This can happen if we hit the Apple CFStream bug which results in the
193*cc02d7e2SAndroid Build Coastguard Worker       // read stream freezing. We are ignoring hangs and timeouts, but these
194*cc02d7e2SAndroid Build Coastguard Worker       // tests are still useful as they can catch memory memory corruptions,
195*cc02d7e2SAndroid Build Coastguard Worker       // crashes and other bugs that don't result in test freeze/timeout.
196*cc02d7e2SAndroid Build Coastguard Worker       return false;
197*cc02d7e2SAndroid Build Coastguard Worker     }
198*cc02d7e2SAndroid Build Coastguard Worker   }
199*cc02d7e2SAndroid Build Coastguard Worker 
WaitForChannelNotReady(Channel * channel,int timeout_seconds=5)200*cc02d7e2SAndroid Build Coastguard Worker   bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
201*cc02d7e2SAndroid Build Coastguard Worker     const gpr_timespec deadline =
202*cc02d7e2SAndroid Build Coastguard Worker         grpc_timeout_seconds_to_deadline(timeout_seconds);
203*cc02d7e2SAndroid Build Coastguard Worker     grpc_connectivity_state state;
204*cc02d7e2SAndroid Build Coastguard Worker     while ((state = channel->GetState(false /* try_to_connect */)) ==
205*cc02d7e2SAndroid Build Coastguard Worker            GRPC_CHANNEL_READY) {
206*cc02d7e2SAndroid Build Coastguard Worker       if (!channel->WaitForStateChange(state, deadline)) return false;
207*cc02d7e2SAndroid Build Coastguard Worker     }
208*cc02d7e2SAndroid Build Coastguard Worker     return true;
209*cc02d7e2SAndroid Build Coastguard Worker   }
210*cc02d7e2SAndroid Build Coastguard Worker 
WaitForChannelReady(Channel * channel,int timeout_seconds=10)211*cc02d7e2SAndroid Build Coastguard Worker   bool WaitForChannelReady(Channel* channel, int timeout_seconds = 10) {
212*cc02d7e2SAndroid Build Coastguard Worker     const gpr_timespec deadline =
213*cc02d7e2SAndroid Build Coastguard Worker         grpc_timeout_seconds_to_deadline(timeout_seconds);
214*cc02d7e2SAndroid Build Coastguard Worker     grpc_connectivity_state state;
215*cc02d7e2SAndroid Build Coastguard Worker     while ((state = channel->GetState(true /* try_to_connect */)) !=
216*cc02d7e2SAndroid Build Coastguard Worker            GRPC_CHANNEL_READY) {
217*cc02d7e2SAndroid Build Coastguard Worker       if (!channel->WaitForStateChange(state, deadline)) return false;
218*cc02d7e2SAndroid Build Coastguard Worker     }
219*cc02d7e2SAndroid Build Coastguard Worker     return true;
220*cc02d7e2SAndroid Build Coastguard Worker   }
221*cc02d7e2SAndroid Build Coastguard Worker 
222*cc02d7e2SAndroid Build Coastguard Worker   struct AsyncClientCall {
223*cc02d7e2SAndroid Build Coastguard Worker     EchoResponse reply;
224*cc02d7e2SAndroid Build Coastguard Worker     ClientContext context;
225*cc02d7e2SAndroid Build Coastguard Worker     Status status;
226*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader;
227*cc02d7e2SAndroid Build Coastguard Worker   };
228*cc02d7e2SAndroid Build Coastguard Worker 
229*cc02d7e2SAndroid Build Coastguard Worker  private:
230*cc02d7e2SAndroid Build Coastguard Worker   struct ServerData {
231*cc02d7e2SAndroid Build Coastguard Worker     int port_;
232*cc02d7e2SAndroid Build Coastguard Worker     const std::string creds_;
233*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<Server> server_;
234*cc02d7e2SAndroid Build Coastguard Worker     TestServiceImpl service_;
235*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<std::thread> thread_;
236*cc02d7e2SAndroid Build Coastguard Worker     bool server_ready_ = false;
237*cc02d7e2SAndroid Build Coastguard Worker 
ServerDatagrpc::testing::__anon526209530111::CFStreamTest::ServerData238*cc02d7e2SAndroid Build Coastguard Worker     ServerData(int port, const std::string& creds)
239*cc02d7e2SAndroid Build Coastguard Worker         : port_(port), creds_(creds) {}
240*cc02d7e2SAndroid Build Coastguard Worker 
Startgrpc::testing::__anon526209530111::CFStreamTest::ServerData241*cc02d7e2SAndroid Build Coastguard Worker     void Start(const std::string& server_host) {
242*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_INFO, "starting server on port %d", port_);
243*cc02d7e2SAndroid Build Coastguard Worker       std::mutex mu;
244*cc02d7e2SAndroid Build Coastguard Worker       std::unique_lock<std::mutex> lock(mu);
245*cc02d7e2SAndroid Build Coastguard Worker       std::condition_variable cond;
246*cc02d7e2SAndroid Build Coastguard Worker       thread_.reset(new std::thread(
247*cc02d7e2SAndroid Build Coastguard Worker           std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
248*cc02d7e2SAndroid Build Coastguard Worker       cond.wait(lock, [this] { return server_ready_; });
249*cc02d7e2SAndroid Build Coastguard Worker       server_ready_ = false;
250*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_INFO, "server startup complete");
251*cc02d7e2SAndroid Build Coastguard Worker     }
252*cc02d7e2SAndroid Build Coastguard Worker 
Servegrpc::testing::__anon526209530111::CFStreamTest::ServerData253*cc02d7e2SAndroid Build Coastguard Worker     void Serve(const std::string& server_host, std::mutex* mu,
254*cc02d7e2SAndroid Build Coastguard Worker                std::condition_variable* cond) {
255*cc02d7e2SAndroid Build Coastguard Worker       std::ostringstream server_address;
256*cc02d7e2SAndroid Build Coastguard Worker       server_address << server_host << ":" << port_;
257*cc02d7e2SAndroid Build Coastguard Worker       ServerBuilder builder;
258*cc02d7e2SAndroid Build Coastguard Worker       auto server_creds =
259*cc02d7e2SAndroid Build Coastguard Worker           GetCredentialsProvider()->GetServerCredentials(creds_);
260*cc02d7e2SAndroid Build Coastguard Worker       builder.AddListeningPort(server_address.str(), server_creds);
261*cc02d7e2SAndroid Build Coastguard Worker       builder.RegisterService(&service_);
262*cc02d7e2SAndroid Build Coastguard Worker       server_ = builder.BuildAndStart();
263*cc02d7e2SAndroid Build Coastguard Worker       std::lock_guard<std::mutex> lock(*mu);
264*cc02d7e2SAndroid Build Coastguard Worker       server_ready_ = true;
265*cc02d7e2SAndroid Build Coastguard Worker       cond->notify_one();
266*cc02d7e2SAndroid Build Coastguard Worker     }
267*cc02d7e2SAndroid Build Coastguard Worker 
Shutdowngrpc::testing::__anon526209530111::CFStreamTest::ServerData268*cc02d7e2SAndroid Build Coastguard Worker     void Shutdown(bool join = true) {
269*cc02d7e2SAndroid Build Coastguard Worker       server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
270*cc02d7e2SAndroid Build Coastguard Worker       if (join) thread_->join();
271*cc02d7e2SAndroid Build Coastguard Worker     }
272*cc02d7e2SAndroid Build Coastguard Worker   };
273*cc02d7e2SAndroid Build Coastguard Worker 
274*cc02d7e2SAndroid Build Coastguard Worker   CompletionQueue cq_;
275*cc02d7e2SAndroid Build Coastguard Worker   const std::string server_host_;
276*cc02d7e2SAndroid Build Coastguard Worker   const std::string interface_;
277*cc02d7e2SAndroid Build Coastguard Worker   const std::string ipv4_address_;
278*cc02d7e2SAndroid Build Coastguard Worker   std::unique_ptr<ServerData> server_;
279*cc02d7e2SAndroid Build Coastguard Worker   int port_;
280*cc02d7e2SAndroid Build Coastguard Worker };
281*cc02d7e2SAndroid Build Coastguard Worker 
CreateTestScenarios()282*cc02d7e2SAndroid Build Coastguard Worker std::vector<TestScenario> CreateTestScenarios() {
283*cc02d7e2SAndroid Build Coastguard Worker   std::vector<TestScenario> scenarios;
284*cc02d7e2SAndroid Build Coastguard Worker   std::vector<std::string> credentials_types;
285*cc02d7e2SAndroid Build Coastguard Worker   std::vector<std::string> messages;
286*cc02d7e2SAndroid Build Coastguard Worker 
287*cc02d7e2SAndroid Build Coastguard Worker   credentials_types.push_back(kInsecureCredentialsType);
288*cc02d7e2SAndroid Build Coastguard Worker   auto sec_list = GetCredentialsProvider()->GetSecureCredentialsTypeList();
289*cc02d7e2SAndroid Build Coastguard Worker   for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
290*cc02d7e2SAndroid Build Coastguard Worker     credentials_types.push_back(*sec);
291*cc02d7e2SAndroid Build Coastguard Worker   }
292*cc02d7e2SAndroid Build Coastguard Worker 
293*cc02d7e2SAndroid Build Coastguard Worker   messages.push_back("��");
294*cc02d7e2SAndroid Build Coastguard Worker   for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024; k *= 32) {
295*cc02d7e2SAndroid Build Coastguard Worker     std::string big_msg;
296*cc02d7e2SAndroid Build Coastguard Worker     for (size_t i = 0; i < k * 1024; ++i) {
297*cc02d7e2SAndroid Build Coastguard Worker       char c = 'a' + (i % 26);
298*cc02d7e2SAndroid Build Coastguard Worker       big_msg += c;
299*cc02d7e2SAndroid Build Coastguard Worker     }
300*cc02d7e2SAndroid Build Coastguard Worker     messages.push_back(big_msg);
301*cc02d7e2SAndroid Build Coastguard Worker   }
302*cc02d7e2SAndroid Build Coastguard Worker   for (auto cred = credentials_types.begin(); cred != credentials_types.end();
303*cc02d7e2SAndroid Build Coastguard Worker        ++cred) {
304*cc02d7e2SAndroid Build Coastguard Worker     for (auto msg = messages.begin(); msg != messages.end(); msg++) {
305*cc02d7e2SAndroid Build Coastguard Worker       scenarios.emplace_back(*cred, *msg);
306*cc02d7e2SAndroid Build Coastguard Worker     }
307*cc02d7e2SAndroid Build Coastguard Worker   }
308*cc02d7e2SAndroid Build Coastguard Worker 
309*cc02d7e2SAndroid Build Coastguard Worker   return scenarios;
310*cc02d7e2SAndroid Build Coastguard Worker }
311*cc02d7e2SAndroid Build Coastguard Worker 
312*cc02d7e2SAndroid Build Coastguard Worker INSTANTIATE_TEST_SUITE_P(CFStreamTest, CFStreamTest,
313*cc02d7e2SAndroid Build Coastguard Worker                          ::testing::ValuesIn(CreateTestScenarios()));
314*cc02d7e2SAndroid Build Coastguard Worker 
315*cc02d7e2SAndroid Build Coastguard Worker // gRPC should automatically detech network flaps (without enabling keepalives)
316*cc02d7e2SAndroid Build Coastguard Worker //  when CFStream is enabled
TEST_P(CFStreamTest,NetworkTransition)317*cc02d7e2SAndroid Build Coastguard Worker TEST_P(CFStreamTest, NetworkTransition) {
318*cc02d7e2SAndroid Build Coastguard Worker   auto channel = BuildChannel();
319*cc02d7e2SAndroid Build Coastguard Worker   auto stub = BuildStub(channel);
320*cc02d7e2SAndroid Build Coastguard Worker   // Channel should be in READY state after we send an RPC
321*cc02d7e2SAndroid Build Coastguard Worker   SendRpc(stub, /*expect_success=*/true);
322*cc02d7e2SAndroid Build Coastguard Worker   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
323*cc02d7e2SAndroid Build Coastguard Worker 
324*cc02d7e2SAndroid Build Coastguard Worker   std::atomic_bool shutdown{false};
325*cc02d7e2SAndroid Build Coastguard Worker   std::thread sender = std::thread([this, &stub, &shutdown]() {
326*cc02d7e2SAndroid Build Coastguard Worker     while (true) {
327*cc02d7e2SAndroid Build Coastguard Worker       if (shutdown.load()) {
328*cc02d7e2SAndroid Build Coastguard Worker         return;
329*cc02d7e2SAndroid Build Coastguard Worker       }
330*cc02d7e2SAndroid Build Coastguard Worker       SendRpc(stub);
331*cc02d7e2SAndroid Build Coastguard Worker       std::this_thread::sleep_for(std::chrono::milliseconds(1000));
332*cc02d7e2SAndroid Build Coastguard Worker     }
333*cc02d7e2SAndroid Build Coastguard Worker   });
334*cc02d7e2SAndroid Build Coastguard Worker 
335*cc02d7e2SAndroid Build Coastguard Worker   // bring down network
336*cc02d7e2SAndroid Build Coastguard Worker   NetworkDown();
337*cc02d7e2SAndroid Build Coastguard Worker 
338*cc02d7e2SAndroid Build Coastguard Worker   // network going down should be detected by cfstream
339*cc02d7e2SAndroid Build Coastguard Worker   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
340*cc02d7e2SAndroid Build Coastguard Worker 
341*cc02d7e2SAndroid Build Coastguard Worker   // bring network interface back up
342*cc02d7e2SAndroid Build Coastguard Worker   std::this_thread::sleep_for(std::chrono::milliseconds(1000));
343*cc02d7e2SAndroid Build Coastguard Worker   NetworkUp();
344*cc02d7e2SAndroid Build Coastguard Worker 
345*cc02d7e2SAndroid Build Coastguard Worker   // channel should reconnect
346*cc02d7e2SAndroid Build Coastguard Worker   EXPECT_TRUE(WaitForChannelReady(channel.get()));
347*cc02d7e2SAndroid Build Coastguard Worker   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
348*cc02d7e2SAndroid Build Coastguard Worker   shutdown.store(true);
349*cc02d7e2SAndroid Build Coastguard Worker   sender.join();
350*cc02d7e2SAndroid Build Coastguard Worker }
351*cc02d7e2SAndroid Build Coastguard Worker 
352*cc02d7e2SAndroid Build Coastguard Worker // Network flaps while RPCs are in flight
TEST_P(CFStreamTest,NetworkFlapRpcsInFlight)353*cc02d7e2SAndroid Build Coastguard Worker TEST_P(CFStreamTest, NetworkFlapRpcsInFlight) {
354*cc02d7e2SAndroid Build Coastguard Worker   auto channel = BuildChannel();
355*cc02d7e2SAndroid Build Coastguard Worker   auto stub = BuildStub(channel);
356*cc02d7e2SAndroid Build Coastguard Worker   std::atomic_int rpcs_sent{0};
357*cc02d7e2SAndroid Build Coastguard Worker 
358*cc02d7e2SAndroid Build Coastguard Worker   // Channel should be in READY state after we send some RPCs
359*cc02d7e2SAndroid Build Coastguard Worker   for (int i = 0; i < 10; ++i) {
360*cc02d7e2SAndroid Build Coastguard Worker     RequestParams param;
361*cc02d7e2SAndroid Build Coastguard Worker     param.set_skip_cancelled_check(true);
362*cc02d7e2SAndroid Build Coastguard Worker     SendAsyncRpc(stub, param);
363*cc02d7e2SAndroid Build Coastguard Worker     ++rpcs_sent;
364*cc02d7e2SAndroid Build Coastguard Worker   }
365*cc02d7e2SAndroid Build Coastguard Worker   EXPECT_TRUE(WaitForChannelReady(channel.get()));
366*cc02d7e2SAndroid Build Coastguard Worker 
367*cc02d7e2SAndroid Build Coastguard Worker   // Bring down the network
368*cc02d7e2SAndroid Build Coastguard Worker   NetworkDown();
369*cc02d7e2SAndroid Build Coastguard Worker 
370*cc02d7e2SAndroid Build Coastguard Worker   std::thread thd = std::thread([this, &rpcs_sent]() {
371*cc02d7e2SAndroid Build Coastguard Worker     void* got_tag;
372*cc02d7e2SAndroid Build Coastguard Worker     bool ok = false;
373*cc02d7e2SAndroid Build Coastguard Worker     bool network_down = true;
374*cc02d7e2SAndroid Build Coastguard Worker     int total_completions = 0;
375*cc02d7e2SAndroid Build Coastguard Worker 
376*cc02d7e2SAndroid Build Coastguard Worker     while (CQNext(&got_tag, &ok)) {
377*cc02d7e2SAndroid Build Coastguard Worker       ++total_completions;
378*cc02d7e2SAndroid Build Coastguard Worker       GPR_ASSERT(ok);
379*cc02d7e2SAndroid Build Coastguard Worker       AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
380*cc02d7e2SAndroid Build Coastguard Worker       if (!call->status.ok()) {
381*cc02d7e2SAndroid Build Coastguard Worker         gpr_log(GPR_DEBUG, "RPC failed with error: %s",
382*cc02d7e2SAndroid Build Coastguard Worker                 call->status.error_message().c_str());
383*cc02d7e2SAndroid Build Coastguard Worker         // Bring network up when RPCs start failing
384*cc02d7e2SAndroid Build Coastguard Worker         if (network_down) {
385*cc02d7e2SAndroid Build Coastguard Worker           NetworkUp();
386*cc02d7e2SAndroid Build Coastguard Worker           network_down = false;
387*cc02d7e2SAndroid Build Coastguard Worker         }
388*cc02d7e2SAndroid Build Coastguard Worker       } else {
389*cc02d7e2SAndroid Build Coastguard Worker         gpr_log(GPR_DEBUG, "RPC succeeded");
390*cc02d7e2SAndroid Build Coastguard Worker       }
391*cc02d7e2SAndroid Build Coastguard Worker       delete call;
392*cc02d7e2SAndroid Build Coastguard Worker     }
393*cc02d7e2SAndroid Build Coastguard Worker     // Remove line below and uncomment the following line after Apple CFStream
394*cc02d7e2SAndroid Build Coastguard Worker     // bug has been fixed.
395*cc02d7e2SAndroid Build Coastguard Worker     (void)rpcs_sent;
396*cc02d7e2SAndroid Build Coastguard Worker     // EXPECT_EQ(total_completions, rpcs_sent);
397*cc02d7e2SAndroid Build Coastguard Worker   });
398*cc02d7e2SAndroid Build Coastguard Worker 
399*cc02d7e2SAndroid Build Coastguard Worker   for (int i = 0; i < 100; ++i) {
400*cc02d7e2SAndroid Build Coastguard Worker     RequestParams param;
401*cc02d7e2SAndroid Build Coastguard Worker     param.set_skip_cancelled_check(true);
402*cc02d7e2SAndroid Build Coastguard Worker     SendAsyncRpc(stub, param);
403*cc02d7e2SAndroid Build Coastguard Worker     std::this_thread::sleep_for(std::chrono::milliseconds(10));
404*cc02d7e2SAndroid Build Coastguard Worker     ++rpcs_sent;
405*cc02d7e2SAndroid Build Coastguard Worker   }
406*cc02d7e2SAndroid Build Coastguard Worker 
407*cc02d7e2SAndroid Build Coastguard Worker   ShutdownCQ();
408*cc02d7e2SAndroid Build Coastguard Worker 
409*cc02d7e2SAndroid Build Coastguard Worker   thd.join();
410*cc02d7e2SAndroid Build Coastguard Worker }
411*cc02d7e2SAndroid Build Coastguard Worker 
412*cc02d7e2SAndroid Build Coastguard Worker // Send a bunch of RPCs, some of which are expected to fail.
413*cc02d7e2SAndroid Build Coastguard Worker // We should get back a response for all RPCs
TEST_P(CFStreamTest,ConcurrentRpc)414*cc02d7e2SAndroid Build Coastguard Worker TEST_P(CFStreamTest, ConcurrentRpc) {
415*cc02d7e2SAndroid Build Coastguard Worker   auto channel = BuildChannel();
416*cc02d7e2SAndroid Build Coastguard Worker   auto stub = BuildStub(channel);
417*cc02d7e2SAndroid Build Coastguard Worker   std::atomic_int rpcs_sent{0};
418*cc02d7e2SAndroid Build Coastguard Worker   std::thread thd = std::thread([this, &rpcs_sent]() {
419*cc02d7e2SAndroid Build Coastguard Worker     void* got_tag;
420*cc02d7e2SAndroid Build Coastguard Worker     bool ok = false;
421*cc02d7e2SAndroid Build Coastguard Worker     int total_completions = 0;
422*cc02d7e2SAndroid Build Coastguard Worker 
423*cc02d7e2SAndroid Build Coastguard Worker     while (CQNext(&got_tag, &ok)) {
424*cc02d7e2SAndroid Build Coastguard Worker       ++total_completions;
425*cc02d7e2SAndroid Build Coastguard Worker       GPR_ASSERT(ok);
426*cc02d7e2SAndroid Build Coastguard Worker       AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
427*cc02d7e2SAndroid Build Coastguard Worker       if (!call->status.ok()) {
428*cc02d7e2SAndroid Build Coastguard Worker         gpr_log(GPR_DEBUG, "RPC failed with error: %s",
429*cc02d7e2SAndroid Build Coastguard Worker                 call->status.error_message().c_str());
430*cc02d7e2SAndroid Build Coastguard Worker         // Bring network up when RPCs start failing
431*cc02d7e2SAndroid Build Coastguard Worker       } else {
432*cc02d7e2SAndroid Build Coastguard Worker         gpr_log(GPR_DEBUG, "RPC succeeded");
433*cc02d7e2SAndroid Build Coastguard Worker       }
434*cc02d7e2SAndroid Build Coastguard Worker       delete call;
435*cc02d7e2SAndroid Build Coastguard Worker     }
436*cc02d7e2SAndroid Build Coastguard Worker     // Remove line below and uncomment the following line after Apple CFStream
437*cc02d7e2SAndroid Build Coastguard Worker     // bug has been fixed.
438*cc02d7e2SAndroid Build Coastguard Worker     (void)rpcs_sent;
439*cc02d7e2SAndroid Build Coastguard Worker     // EXPECT_EQ(total_completions, rpcs_sent);
440*cc02d7e2SAndroid Build Coastguard Worker   });
441*cc02d7e2SAndroid Build Coastguard Worker 
442*cc02d7e2SAndroid Build Coastguard Worker   for (int i = 0; i < 10; ++i) {
443*cc02d7e2SAndroid Build Coastguard Worker     if (i % 3 == 0) {
444*cc02d7e2SAndroid Build Coastguard Worker       RequestParams param;
445*cc02d7e2SAndroid Build Coastguard Worker       ErrorStatus* error = param.mutable_expected_error();
446*cc02d7e2SAndroid Build Coastguard Worker       error->set_code(StatusCode::INTERNAL);
447*cc02d7e2SAndroid Build Coastguard Worker       error->set_error_message("internal error");
448*cc02d7e2SAndroid Build Coastguard Worker       SendAsyncRpc(stub, param);
449*cc02d7e2SAndroid Build Coastguard Worker     } else if (i % 5 == 0) {
450*cc02d7e2SAndroid Build Coastguard Worker       RequestParams param;
451*cc02d7e2SAndroid Build Coastguard Worker       param.set_echo_metadata(true);
452*cc02d7e2SAndroid Build Coastguard Worker       DebugInfo* info = param.mutable_debug_info();
453*cc02d7e2SAndroid Build Coastguard Worker       info->add_stack_entries("stack_entry1");
454*cc02d7e2SAndroid Build Coastguard Worker       info->add_stack_entries("stack_entry2");
455*cc02d7e2SAndroid Build Coastguard Worker       info->set_detail("detailed debug info");
456*cc02d7e2SAndroid Build Coastguard Worker       SendAsyncRpc(stub, param);
457*cc02d7e2SAndroid Build Coastguard Worker     } else {
458*cc02d7e2SAndroid Build Coastguard Worker       SendAsyncRpc(stub);
459*cc02d7e2SAndroid Build Coastguard Worker     }
460*cc02d7e2SAndroid Build Coastguard Worker     ++rpcs_sent;
461*cc02d7e2SAndroid Build Coastguard Worker   }
462*cc02d7e2SAndroid Build Coastguard Worker 
463*cc02d7e2SAndroid Build Coastguard Worker   ShutdownCQ();
464*cc02d7e2SAndroid Build Coastguard Worker 
465*cc02d7e2SAndroid Build Coastguard Worker   thd.join();
466*cc02d7e2SAndroid Build Coastguard Worker }
467*cc02d7e2SAndroid Build Coastguard Worker 
468*cc02d7e2SAndroid Build Coastguard Worker }  // namespace
469*cc02d7e2SAndroid Build Coastguard Worker }  // namespace testing
470*cc02d7e2SAndroid Build Coastguard Worker }  // namespace grpc
471*cc02d7e2SAndroid Build Coastguard Worker #endif  // GRPC_CFSTREAM
472*cc02d7e2SAndroid Build Coastguard Worker 
main(int argc,char ** argv)473*cc02d7e2SAndroid Build Coastguard Worker int main(int argc, char** argv) {
474*cc02d7e2SAndroid Build Coastguard Worker   ::testing::InitGoogleTest(&argc, argv);
475*cc02d7e2SAndroid Build Coastguard Worker   grpc::testing::TestEnvironment env(&argc, argv);
476*cc02d7e2SAndroid Build Coastguard Worker   grpc_core::SetEnv("grpc_cfstream", "1");
477*cc02d7e2SAndroid Build Coastguard Worker   const auto result = RUN_ALL_TESTS();
478*cc02d7e2SAndroid Build Coastguard Worker   return result;
479*cc02d7e2SAndroid Build Coastguard Worker }
480