1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <thread>
20
21 #include <gtest/gtest.h>
22
23 #include "absl/strings/str_format.h"
24
25 #include <grpc/support/log.h>
26 #include <grpcpp/create_channel.h>
27 #include <grpcpp/security/credentials.h>
28 #include <grpcpp/server.h>
29 #include <grpcpp/server_builder.h>
30 #include <grpcpp/support/config.h>
31
32 #include "src/core/lib/gprpp/crash.h"
33 #include "src/proto/grpc/testing/echo.grpc.pb.h"
34 #include "test/core/util/port.h"
35 #include "test/core/util/test_config.h"
36
37 namespace grpc {
38 namespace {
39
TEST(ServerRequestCallTest,ShortDeadlineDoesNotCauseOkayFalse)40 TEST(ServerRequestCallTest, ShortDeadlineDoesNotCauseOkayFalse) {
41 std::mutex mu;
42 bool shutting_down = false;
43
44 // grpc server config.
45 std::ostringstream s;
46 int p = grpc_pick_unused_port_or_die();
47 s << "[::1]:" << p;
48 const string address = s.str();
49 testing::EchoTestService::AsyncService service;
50 ServerBuilder builder;
51 builder.AddListeningPort(address, InsecureServerCredentials());
52 auto cq = builder.AddCompletionQueue();
53 builder.RegisterService(&service);
54 auto server = builder.BuildAndStart();
55
56 // server thread.
57 std::thread t([address, &service, &cq, &mu, &shutting_down] {
58 for (int n = 0; true; n++) {
59 ServerContext ctx;
60 testing::EchoRequest req;
61 ServerAsyncResponseWriter<testing::EchoResponse> responder(&ctx);
62
63 // if shutting down, don't enqueue a new request.
64 {
65 std::lock_guard<std::mutex> lock(mu);
66 if (!shutting_down) {
67 service.RequestEcho(&ctx, &req, &responder, cq.get(), cq.get(),
68 reinterpret_cast<void*>(1));
69 }
70 }
71
72 bool ok;
73 void* tag;
74 if (!cq->Next(&tag, &ok)) {
75 break;
76 }
77
78 EXPECT_EQ((void*)1, tag);
79 // If not shutting down, ok must be true for new requests.
80 {
81 std::lock_guard<std::mutex> lock(mu);
82 if (!shutting_down && !ok) {
83 grpc_core::Crash(absl::StrFormat("!ok on request %d", n));
84 }
85 if (shutting_down && !ok) {
86 // Failed connection due to shutdown, continue flushing the CQ.
87 continue;
88 }
89 }
90
91 // Send a simple response after a small delay that would ensure the client
92 // deadline is exceeded.
93 gpr_log(GPR_INFO, "Got request %d", n);
94 testing::EchoResponse response;
95 response.set_message("foobar");
96 // A bit of sleep to make sure the deadline elapses.
97 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
98 gpr_time_from_seconds(1, GPR_TIMESPAN)));
99 {
100 std::lock_guard<std::mutex> lock(mu);
101 if (shutting_down) {
102 gpr_log(GPR_INFO,
103 "shut down while processing call, not calling Finish()");
104 // Continue flushing the CQ.
105 continue;
106 }
107 gpr_log(GPR_INFO, "Finishing request %d", n);
108 responder.Finish(response, grpc::Status::OK,
109 reinterpret_cast<void*>(2));
110 if (!cq->Next(&tag, &ok)) {
111 break;
112 }
113 EXPECT_EQ((void*)2, tag);
114 }
115 }
116 });
117
118 auto stub = testing::EchoTestService::NewStub(
119 grpc::CreateChannel(address, InsecureChannelCredentials()));
120
121 for (int i = 0; i < 100; i++) {
122 gpr_log(GPR_INFO, "Sending %d.", i);
123 testing::EchoRequest request;
124
125 /////////
126 // Comment out the following line to get ok=false due to invalid request.
127 // Otherwise, ok=false due to deadline being exceeded.
128 /////////
129 request.set_message("foobar");
130
131 // A simple request with a short deadline. The server will always exceed the
132 // deadline, whether due to the sleep or because the server was unable to
133 // even fetch the request from the CQ before the deadline elapsed.
134 testing::EchoResponse response;
135 grpc::ClientContext ctx;
136 ctx.set_fail_fast(false);
137 ctx.set_deadline(std::chrono::system_clock::now() +
138 std::chrono::milliseconds(1));
139 grpc::Status status = stub->Echo(&ctx, request, &response);
140 EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, status.error_code());
141 gpr_log(GPR_INFO, "Success.");
142 }
143 gpr_log(GPR_INFO, "Done sending RPCs.");
144
145 // Shut down everything properly.
146 gpr_log(GPR_INFO, "Shutting down.");
147 {
148 std::lock_guard<std::mutex> lock(mu);
149 shutting_down = true;
150 }
151 server->Shutdown();
152 cq->Shutdown();
153 server->Wait();
154
155 t.join();
156 }
157
ServerFunction(ServerCompletionQueue * cq,std::atomic_bool * shutdown)158 void ServerFunction(ServerCompletionQueue* cq, std::atomic_bool* shutdown) {
159 for (;;) {
160 bool ok;
161 void* tag;
162 if (!cq->Next(&tag, &ok)) {
163 break;
164 }
165 if (shutdown->load()) {
166 break;
167 }
168 // For UnimplementedAsyncRequest, the server handles it internally and never
169 // returns from Next except when shutdown.
170 grpc_core::Crash("unreached");
171 }
172 }
173
ClientFunction(testing::UnimplementedEchoService::Stub * stub)174 void ClientFunction(testing::UnimplementedEchoService::Stub* stub) {
175 constexpr int kNumRpcPerThreads = 5000;
176 for (int i = 0; i < kNumRpcPerThreads; i++) {
177 testing::EchoRequest request;
178 request.set_message("foobar");
179 testing::EchoResponse response;
180 grpc::ClientContext ctx;
181 grpc::Status status = stub->Unimplemented(&ctx, request, &response);
182 EXPECT_EQ(StatusCode::UNIMPLEMENTED, status.error_code());
183 }
184 }
185
TEST(ServerRequestCallTest,MultithreadedUnimplementedService)186 TEST(ServerRequestCallTest, MultithreadedUnimplementedService) {
187 std::atomic_bool shutdown(false);
188 // grpc server config.
189 std::ostringstream s;
190 int p = grpc_pick_unused_port_or_die();
191 s << "[::1]:" << p;
192 const string address = s.str();
193 testing::EchoTestService::AsyncService service;
194 ServerBuilder builder;
195 builder.AddListeningPort(address, InsecureServerCredentials());
196 auto cq = builder.AddCompletionQueue();
197 builder.RegisterService(&service);
198 auto server = builder.BuildAndStart();
199
200 ServerContext ctx;
201 testing::EchoRequest req;
202 ServerAsyncResponseWriter<testing::EchoResponse> responder(&ctx);
203 service.RequestEcho(&ctx, &req, &responder, cq.get(), cq.get(),
204 reinterpret_cast<void*>(1));
205
206 // server threads
207 constexpr int kNumServerThreads = 2;
208 std::vector<std::thread> server_threads;
209 server_threads.reserve(kNumServerThreads);
210 for (int i = 0; i < kNumServerThreads; i++) {
211 server_threads.emplace_back(ServerFunction, cq.get(), &shutdown);
212 }
213
214 auto stub = testing::UnimplementedEchoService::NewStub(
215 grpc::CreateChannel(address, InsecureChannelCredentials()));
216
217 // client threads
218 constexpr int kNumClientThreads = 2;
219 std::vector<std::thread> client_threads;
220 client_threads.reserve(kNumClientThreads);
221 for (int i = 0; i < kNumClientThreads; i++) {
222 client_threads.emplace_back(ClientFunction, stub.get());
223 }
224 for (auto& t : client_threads) {
225 t.join();
226 }
227
228 // Shut down everything properly.
229 gpr_log(GPR_INFO, "Shutting down.");
230 shutdown.store(true);
231 server->Shutdown();
232 cq->Shutdown();
233 server->Wait();
234
235 for (auto& t : server_threads) {
236 t.join();
237 }
238 }
239
240 } // namespace
241 } // namespace grpc
242
main(int argc,char ** argv)243 int main(int argc, char** argv) {
244 grpc::testing::TestEnvironment env(&argc, argv);
245 ::testing::InitGoogleTest(&argc, argv);
246 return RUN_ALL_TESTS();
247 }
248