1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <algorithm>
20 #include <atomic>
21 #include <condition_variable>
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <sstream>
26 #include <thread>
27
28 #include <google/protobuf/arena.h>
29 #include <gtest/gtest.h>
30
31 #include <grpc/support/log.h>
32 #include <grpcpp/channel.h>
33 #include <grpcpp/client_context.h>
34 #include <grpcpp/create_channel.h>
35 #include <grpcpp/server.h>
36 #include <grpcpp/server_builder.h>
37 #include <grpcpp/server_context.h>
38 #include <grpcpp/support/client_callback.h>
39 #include <grpcpp/support/message_allocator.h>
40
41 #include "src/core/lib/iomgr/iomgr.h"
42 #include "src/proto/grpc/testing/echo.grpc.pb.h"
43 #include "test/core/util/port.h"
44 #include "test/core/util/test_config.h"
45 #include "test/cpp/util/test_credentials_provider.h"
46
47 namespace grpc {
48 namespace testing {
49 namespace {
50
51 class CallbackTestServiceImpl : public EchoTestService::CallbackService {
52 public:
CallbackTestServiceImpl()53 explicit CallbackTestServiceImpl() {}
54
SetAllocatorMutator(std::function<void (RpcAllocatorState * allocator_state,const EchoRequest * req,EchoResponse * resp)> mutator)55 void SetAllocatorMutator(
56 std::function<void(RpcAllocatorState* allocator_state,
57 const EchoRequest* req, EchoResponse* resp)>
58 mutator) {
59 allocator_mutator_ = std::move(mutator);
60 }
61
Echo(CallbackServerContext * context,const EchoRequest * request,EchoResponse * response)62 ServerUnaryReactor* Echo(CallbackServerContext* context,
63 const EchoRequest* request,
64 EchoResponse* response) override {
65 response->set_message(request->message());
66 if (allocator_mutator_) {
67 allocator_mutator_(context->GetRpcAllocatorState(), request, response);
68 }
69 auto* reactor = context->DefaultReactor();
70 reactor->Finish(Status::OK);
71 return reactor;
72 }
73
74 private:
75 std::function<void(RpcAllocatorState* allocator_state, const EchoRequest* req,
76 EchoResponse* resp)>
77 allocator_mutator_;
78 };
79
80 enum class Protocol { INPROC, TCP };
81
82 class TestScenario {
83 public:
TestScenario(Protocol protocol,const std::string & creds_type)84 TestScenario(Protocol protocol, const std::string& creds_type)
85 : protocol(protocol), credentials_type(creds_type) {}
86 void Log() const;
87 Protocol protocol;
88 const std::string credentials_type;
89 };
90
operator <<(std::ostream & out,const TestScenario & scenario)91 std::ostream& operator<<(std::ostream& out, const TestScenario& scenario) {
92 return out << "TestScenario{protocol="
93 << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
94 << "," << scenario.credentials_type << "}";
95 }
96
Log() const97 void TestScenario::Log() const {
98 std::ostringstream out;
99 out << *this;
100 gpr_log(GPR_INFO, "%s", out.str().c_str());
101 }
102
103 class MessageAllocatorEnd2endTestBase
104 : public ::testing::TestWithParam<TestScenario> {
105 protected:
MessageAllocatorEnd2endTestBase()106 MessageAllocatorEnd2endTestBase() { GetParam().Log(); }
107
108 ~MessageAllocatorEnd2endTestBase() override = default;
109
CreateServer(MessageAllocator<EchoRequest,EchoResponse> * allocator)110 void CreateServer(MessageAllocator<EchoRequest, EchoResponse>* allocator) {
111 ServerBuilder builder;
112
113 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
114 GetParam().credentials_type);
115 if (GetParam().protocol == Protocol::TCP) {
116 picked_port_ = grpc_pick_unused_port_or_die();
117 server_address_ << "localhost:" << picked_port_;
118 builder.AddListeningPort(server_address_.str(), server_creds);
119 }
120 callback_service_.SetMessageAllocatorFor_Echo(allocator);
121 builder.RegisterService(&callback_service_);
122
123 server_ = builder.BuildAndStart();
124 }
125
DestroyServer()126 void DestroyServer() {
127 if (server_) {
128 server_->Shutdown();
129 server_.reset();
130 }
131 }
132
ResetStub()133 void ResetStub() {
134 ChannelArguments args;
135 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
136 GetParam().credentials_type, &args);
137 switch (GetParam().protocol) {
138 case Protocol::TCP:
139 channel_ = grpc::CreateCustomChannel(server_address_.str(),
140 channel_creds, args);
141 break;
142 case Protocol::INPROC:
143 channel_ = server_->InProcessChannel(args);
144 break;
145 default:
146 assert(false);
147 }
148 stub_ = EchoTestService::NewStub(channel_);
149 }
150
TearDown()151 void TearDown() override {
152 DestroyServer();
153 if (picked_port_ > 0) {
154 grpc_recycle_unused_port(picked_port_);
155 }
156 }
157
SendRpcs(int num_rpcs)158 void SendRpcs(int num_rpcs) {
159 std::string test_string;
160 for (int i = 0; i < num_rpcs; i++) {
161 EchoRequest request;
162 EchoResponse response;
163 ClientContext cli_ctx;
164
165 test_string += std::string(1024, 'x');
166 request.set_message(test_string);
167 std::string val;
168 cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
169
170 std::mutex mu;
171 std::condition_variable cv;
172 bool done = false;
173 stub_->async()->Echo(
174 &cli_ctx, &request, &response,
175 [&request, &response, &done, &mu, &cv, val](Status s) {
176 GPR_ASSERT(s.ok());
177
178 EXPECT_EQ(request.message(), response.message());
179 std::lock_guard<std::mutex> l(mu);
180 done = true;
181 cv.notify_one();
182 });
183 std::unique_lock<std::mutex> l(mu);
184 while (!done) {
185 cv.wait(l);
186 }
187 }
188 }
189
190 int picked_port_{0};
191 std::shared_ptr<Channel> channel_;
192 std::unique_ptr<EchoTestService::Stub> stub_;
193 CallbackTestServiceImpl callback_service_;
194 std::unique_ptr<Server> server_;
195 std::ostringstream server_address_;
196 };
197
198 class NullAllocatorTest : public MessageAllocatorEnd2endTestBase {};
199
TEST_P(NullAllocatorTest,SimpleRpc)200 TEST_P(NullAllocatorTest, SimpleRpc) {
201 CreateServer(nullptr);
202 ResetStub();
203 SendRpcs(1);
204 }
205
206 class SimpleAllocatorTest : public MessageAllocatorEnd2endTestBase {
207 public:
208 class SimpleAllocator : public MessageAllocator<EchoRequest, EchoResponse> {
209 public:
210 class MessageHolderImpl : public MessageHolder<EchoRequest, EchoResponse> {
211 public:
MessageHolderImpl(std::atomic_int * request_deallocation_count,std::atomic_int * messages_deallocation_count)212 MessageHolderImpl(std::atomic_int* request_deallocation_count,
213 std::atomic_int* messages_deallocation_count)
214 : request_deallocation_count_(request_deallocation_count),
215 messages_deallocation_count_(messages_deallocation_count) {
216 set_request(new EchoRequest);
217 set_response(new EchoResponse);
218 }
Release()219 void Release() override {
220 (*messages_deallocation_count_)++;
221 delete request();
222 delete response();
223 delete this;
224 }
FreeRequest()225 void FreeRequest() override {
226 (*request_deallocation_count_)++;
227 delete request();
228 set_request(nullptr);
229 }
230
ReleaseRequest()231 EchoRequest* ReleaseRequest() {
232 auto* ret = request();
233 set_request(nullptr);
234 return ret;
235 }
236
237 private:
238 std::atomic_int* const request_deallocation_count_;
239 std::atomic_int* const messages_deallocation_count_;
240 };
AllocateMessages()241 MessageHolder<EchoRequest, EchoResponse>* AllocateMessages() override {
242 allocation_count++;
243 return new MessageHolderImpl(&request_deallocation_count,
244 &messages_deallocation_count);
245 }
246 int allocation_count = 0;
247 std::atomic_int request_deallocation_count{0};
248 std::atomic_int messages_deallocation_count{0};
249 };
250 };
251
TEST_P(SimpleAllocatorTest,SimpleRpc)252 TEST_P(SimpleAllocatorTest, SimpleRpc) {
253 const int kRpcCount = 10;
254 std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
255 CreateServer(allocator.get());
256 ResetStub();
257 SendRpcs(kRpcCount);
258 // messages_deallocaton_count is updated in Release after server side OnDone.
259 // Destroy server to make sure it has been updated.
260 DestroyServer();
261 EXPECT_EQ(kRpcCount, allocator->allocation_count);
262 EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
263 EXPECT_EQ(0, allocator->request_deallocation_count);
264 }
265
TEST_P(SimpleAllocatorTest,RpcWithEarlyFreeRequest)266 TEST_P(SimpleAllocatorTest, RpcWithEarlyFreeRequest) {
267 const int kRpcCount = 10;
268 std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
269 auto mutator = [](RpcAllocatorState* allocator_state, const EchoRequest* req,
270 EchoResponse* resp) {
271 auto* info =
272 static_cast<SimpleAllocator::MessageHolderImpl*>(allocator_state);
273 EXPECT_EQ(req, info->request());
274 EXPECT_EQ(resp, info->response());
275 allocator_state->FreeRequest();
276 EXPECT_EQ(nullptr, info->request());
277 };
278 callback_service_.SetAllocatorMutator(mutator);
279 CreateServer(allocator.get());
280 ResetStub();
281 SendRpcs(kRpcCount);
282 // messages_deallocaton_count is updated in Release after server side OnDone.
283 // Destroy server to make sure it has been updated.
284 DestroyServer();
285 EXPECT_EQ(kRpcCount, allocator->allocation_count);
286 EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
287 EXPECT_EQ(kRpcCount, allocator->request_deallocation_count);
288 }
289
TEST_P(SimpleAllocatorTest,RpcWithReleaseRequest)290 TEST_P(SimpleAllocatorTest, RpcWithReleaseRequest) {
291 const int kRpcCount = 10;
292 std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
293 std::vector<EchoRequest*> released_requests;
294 auto mutator = [&released_requests](RpcAllocatorState* allocator_state,
295 const EchoRequest* req,
296 EchoResponse* resp) {
297 auto* info =
298 static_cast<SimpleAllocator::MessageHolderImpl*>(allocator_state);
299 EXPECT_EQ(req, info->request());
300 EXPECT_EQ(resp, info->response());
301 released_requests.push_back(info->ReleaseRequest());
302 EXPECT_EQ(nullptr, info->request());
303 };
304 callback_service_.SetAllocatorMutator(mutator);
305 CreateServer(allocator.get());
306 ResetStub();
307 SendRpcs(kRpcCount);
308 // messages_deallocaton_count is updated in Release after server side OnDone.
309 // Destroy server to make sure it has been updated.
310 DestroyServer();
311 EXPECT_EQ(kRpcCount, allocator->allocation_count);
312 EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
313 EXPECT_EQ(0, allocator->request_deallocation_count);
314 EXPECT_EQ(static_cast<unsigned>(kRpcCount), released_requests.size());
315 for (auto* req : released_requests) {
316 delete req;
317 }
318 }
319
320 class ArenaAllocatorTest : public MessageAllocatorEnd2endTestBase {
321 public:
322 class ArenaAllocator : public MessageAllocator<EchoRequest, EchoResponse> {
323 public:
324 class MessageHolderImpl : public MessageHolder<EchoRequest, EchoResponse> {
325 public:
MessageHolderImpl()326 MessageHolderImpl() {
327 set_request(
328 google::protobuf::Arena::CreateMessage<EchoRequest>(&arena_));
329 set_response(
330 google::protobuf::Arena::CreateMessage<EchoResponse>(&arena_));
331 }
Release()332 void Release() override { delete this; }
FreeRequest()333 void FreeRequest() override { GPR_ASSERT(0); }
334
335 private:
336 google::protobuf::Arena arena_;
337 };
AllocateMessages()338 MessageHolder<EchoRequest, EchoResponse>* AllocateMessages() override {
339 allocation_count++;
340 return new MessageHolderImpl;
341 }
342 int allocation_count = 0;
343 };
344 };
345
TEST_P(ArenaAllocatorTest,SimpleRpc)346 TEST_P(ArenaAllocatorTest, SimpleRpc) {
347 const int kRpcCount = 10;
348 std::unique_ptr<ArenaAllocator> allocator(new ArenaAllocator);
349 CreateServer(allocator.get());
350 ResetStub();
351 SendRpcs(kRpcCount);
352 EXPECT_EQ(kRpcCount, allocator->allocation_count);
353 }
354
CreateTestScenarios(bool test_insecure)355 std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
356 std::vector<TestScenario> scenarios;
357 std::vector<std::string> credentials_types{
358 GetCredentialsProvider()->GetSecureCredentialsTypeList()};
359 auto insec_ok = [] {
360 // Only allow insecure credentials type when it is registered with the
361 // provider. User may create providers that do not have insecure.
362 return GetCredentialsProvider()->GetChannelCredentials(
363 kInsecureCredentialsType, nullptr) != nullptr;
364 };
365 if (test_insecure && insec_ok()) {
366 credentials_types.push_back(kInsecureCredentialsType);
367 }
368 GPR_ASSERT(!credentials_types.empty());
369
370 Protocol parr[]{Protocol::INPROC, Protocol::TCP};
371 for (Protocol p : parr) {
372 for (const auto& cred : credentials_types) {
373 // TODO(vjpai): Test inproc with secure credentials when feasible
374 if (p == Protocol::INPROC &&
375 (cred != kInsecureCredentialsType || !insec_ok())) {
376 continue;
377 }
378 scenarios.emplace_back(p, cred);
379 }
380 }
381 return scenarios;
382 }
383
384 INSTANTIATE_TEST_SUITE_P(NullAllocatorTest, NullAllocatorTest,
385 ::testing::ValuesIn(CreateTestScenarios(true)));
386 INSTANTIATE_TEST_SUITE_P(SimpleAllocatorTest, SimpleAllocatorTest,
387 ::testing::ValuesIn(CreateTestScenarios(true)));
388 INSTANTIATE_TEST_SUITE_P(ArenaAllocatorTest, ArenaAllocatorTest,
389 ::testing::ValuesIn(CreateTestScenarios(true)));
390
391 } // namespace
392 } // namespace testing
393 } // namespace grpc
394
main(int argc,char ** argv)395 int main(int argc, char** argv) {
396 grpc::testing::TestEnvironment env(&argc, argv);
397 ::testing::InitGoogleTest(&argc, argv);
398 int ret = RUN_ALL_TESTS();
399 return ret;
400 }
401