xref: /aosp_15_r20/external/grpc-grpc/test/cpp/server/server_request_call_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <thread>
20 
21 #include <gtest/gtest.h>
22 
23 #include "absl/strings/str_format.h"
24 
25 #include <grpc/support/log.h>
26 #include <grpcpp/create_channel.h>
27 #include <grpcpp/security/credentials.h>
28 #include <grpcpp/server.h>
29 #include <grpcpp/server_builder.h>
30 #include <grpcpp/support/config.h>
31 
32 #include "src/core/lib/gprpp/crash.h"
33 #include "src/proto/grpc/testing/echo.grpc.pb.h"
34 #include "test/core/util/port.h"
35 #include "test/core/util/test_config.h"
36 
37 namespace grpc {
38 namespace {
39 
TEST(ServerRequestCallTest,ShortDeadlineDoesNotCauseOkayFalse)40 TEST(ServerRequestCallTest, ShortDeadlineDoesNotCauseOkayFalse) {
41   std::mutex mu;
42   bool shutting_down = false;
43 
44   // grpc server config.
45   std::ostringstream s;
46   int p = grpc_pick_unused_port_or_die();
47   s << "[::1]:" << p;
48   const string address = s.str();
49   testing::EchoTestService::AsyncService service;
50   ServerBuilder builder;
51   builder.AddListeningPort(address, InsecureServerCredentials());
52   auto cq = builder.AddCompletionQueue();
53   builder.RegisterService(&service);
54   auto server = builder.BuildAndStart();
55 
56   // server thread.
57   std::thread t([address, &service, &cq, &mu, &shutting_down] {
58     for (int n = 0; true; n++) {
59       ServerContext ctx;
60       testing::EchoRequest req;
61       ServerAsyncResponseWriter<testing::EchoResponse> responder(&ctx);
62 
63       // if shutting down, don't enqueue a new request.
64       {
65         std::lock_guard<std::mutex> lock(mu);
66         if (!shutting_down) {
67           service.RequestEcho(&ctx, &req, &responder, cq.get(), cq.get(),
68                               reinterpret_cast<void*>(1));
69         }
70       }
71 
72       bool ok;
73       void* tag;
74       if (!cq->Next(&tag, &ok)) {
75         break;
76       }
77 
78       EXPECT_EQ((void*)1, tag);
79       // If not shutting down, ok must be true for new requests.
80       {
81         std::lock_guard<std::mutex> lock(mu);
82         if (!shutting_down && !ok) {
83           grpc_core::Crash(absl::StrFormat("!ok on request %d", n));
84         }
85         if (shutting_down && !ok) {
86           // Failed connection due to shutdown, continue flushing the CQ.
87           continue;
88         }
89       }
90 
91       // Send a simple response after a small delay that would ensure the client
92       // deadline is exceeded.
93       gpr_log(GPR_INFO, "Got request %d", n);
94       testing::EchoResponse response;
95       response.set_message("foobar");
96       // A bit of sleep to make sure the deadline elapses.
97       gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
98                                    gpr_time_from_seconds(1, GPR_TIMESPAN)));
99       {
100         std::lock_guard<std::mutex> lock(mu);
101         if (shutting_down) {
102           gpr_log(GPR_INFO,
103                   "shut down while processing call, not calling Finish()");
104           // Continue flushing the CQ.
105           continue;
106         }
107         gpr_log(GPR_INFO, "Finishing request %d", n);
108         responder.Finish(response, grpc::Status::OK,
109                          reinterpret_cast<void*>(2));
110         if (!cq->Next(&tag, &ok)) {
111           break;
112         }
113         EXPECT_EQ((void*)2, tag);
114       }
115     }
116   });
117 
118   auto stub = testing::EchoTestService::NewStub(
119       grpc::CreateChannel(address, InsecureChannelCredentials()));
120 
121   for (int i = 0; i < 100; i++) {
122     gpr_log(GPR_INFO, "Sending %d.", i);
123     testing::EchoRequest request;
124 
125     /////////
126     // Comment out the following line to get ok=false due to invalid request.
127     // Otherwise, ok=false due to deadline being exceeded.
128     /////////
129     request.set_message("foobar");
130 
131     // A simple request with a short deadline. The server will always exceed the
132     // deadline, whether due to the sleep or because the server was unable to
133     // even fetch the request from the CQ before the deadline elapsed.
134     testing::EchoResponse response;
135     grpc::ClientContext ctx;
136     ctx.set_fail_fast(false);
137     ctx.set_deadline(std::chrono::system_clock::now() +
138                      std::chrono::milliseconds(1));
139     grpc::Status status = stub->Echo(&ctx, request, &response);
140     EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, status.error_code());
141     gpr_log(GPR_INFO, "Success.");
142   }
143   gpr_log(GPR_INFO, "Done sending RPCs.");
144 
145   // Shut down everything properly.
146   gpr_log(GPR_INFO, "Shutting down.");
147   {
148     std::lock_guard<std::mutex> lock(mu);
149     shutting_down = true;
150   }
151   server->Shutdown();
152   cq->Shutdown();
153   server->Wait();
154 
155   t.join();
156 }
157 
ServerFunction(ServerCompletionQueue * cq,std::atomic_bool * shutdown)158 void ServerFunction(ServerCompletionQueue* cq, std::atomic_bool* shutdown) {
159   for (;;) {
160     bool ok;
161     void* tag;
162     if (!cq->Next(&tag, &ok)) {
163       break;
164     }
165     if (shutdown->load()) {
166       break;
167     }
168     // For UnimplementedAsyncRequest, the server handles it internally and never
169     // returns from Next except when shutdown.
170     grpc_core::Crash("unreached");
171   }
172 }
173 
ClientFunction(testing::UnimplementedEchoService::Stub * stub)174 void ClientFunction(testing::UnimplementedEchoService::Stub* stub) {
175   constexpr int kNumRpcPerThreads = 5000;
176   for (int i = 0; i < kNumRpcPerThreads; i++) {
177     testing::EchoRequest request;
178     request.set_message("foobar");
179     testing::EchoResponse response;
180     grpc::ClientContext ctx;
181     grpc::Status status = stub->Unimplemented(&ctx, request, &response);
182     EXPECT_EQ(StatusCode::UNIMPLEMENTED, status.error_code());
183   }
184 }
185 
TEST(ServerRequestCallTest,MultithreadedUnimplementedService)186 TEST(ServerRequestCallTest, MultithreadedUnimplementedService) {
187   std::atomic_bool shutdown(false);
188   // grpc server config.
189   std::ostringstream s;
190   int p = grpc_pick_unused_port_or_die();
191   s << "[::1]:" << p;
192   const string address = s.str();
193   testing::EchoTestService::AsyncService service;
194   ServerBuilder builder;
195   builder.AddListeningPort(address, InsecureServerCredentials());
196   auto cq = builder.AddCompletionQueue();
197   builder.RegisterService(&service);
198   auto server = builder.BuildAndStart();
199 
200   ServerContext ctx;
201   testing::EchoRequest req;
202   ServerAsyncResponseWriter<testing::EchoResponse> responder(&ctx);
203   service.RequestEcho(&ctx, &req, &responder, cq.get(), cq.get(),
204                       reinterpret_cast<void*>(1));
205 
206   // server threads
207   constexpr int kNumServerThreads = 2;
208   std::vector<std::thread> server_threads;
209   server_threads.reserve(kNumServerThreads);
210   for (int i = 0; i < kNumServerThreads; i++) {
211     server_threads.emplace_back(ServerFunction, cq.get(), &shutdown);
212   }
213 
214   auto stub = testing::UnimplementedEchoService::NewStub(
215       grpc::CreateChannel(address, InsecureChannelCredentials()));
216 
217   // client threads
218   constexpr int kNumClientThreads = 2;
219   std::vector<std::thread> client_threads;
220   client_threads.reserve(kNumClientThreads);
221   for (int i = 0; i < kNumClientThreads; i++) {
222     client_threads.emplace_back(ClientFunction, stub.get());
223   }
224   for (auto& t : client_threads) {
225     t.join();
226   }
227 
228   // Shut down everything properly.
229   gpr_log(GPR_INFO, "Shutting down.");
230   shutdown.store(true);
231   server->Shutdown();
232   cq->Shutdown();
233   server->Wait();
234 
235   for (auto& t : server_threads) {
236     t.join();
237   }
238 }
239 
240 }  // namespace
241 }  // namespace grpc
242 
main(int argc,char ** argv)243 int main(int argc, char** argv) {
244   grpc::testing::TestEnvironment env(&argc, argv);
245   ::testing::InitGoogleTest(&argc, argv);
246   return RUN_ALL_TESTS();
247 }
248