xref: /aosp_15_r20/external/grpc-grpc/test/cpp/interop/pre_stop_hook_server.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/interop/pre_stop_hook_server.h"
20 
21 #include <thread>
22 
23 #include "absl/strings/str_format.h"
24 
25 #include <grpcpp/grpcpp.h>
26 
27 #include "src/core/lib/gprpp/sync.h"
28 #include "src/proto/grpc/testing/messages.pb.h"
29 
30 namespace grpc {
31 namespace testing {
32 namespace {
33 
34 enum class State : std::uint8_t { kNew, kWaiting, kDone, kShuttingDown };
35 
BuildHookServer(HookServiceImpl * service,int port)36 std::unique_ptr<Server> BuildHookServer(HookServiceImpl* service, int port) {
37   ServerBuilder builder;
38   builder.AddListeningPort(absl::StrFormat("0.0.0.0:%d", port),
39                            grpc::InsecureServerCredentials());
40   builder.RegisterService(service);
41   return builder.BuildAndStart();
42 }
43 
44 }  // namespace
45 
46 class PreStopHookServer {
47  public:
PreStopHookServer(int port,const absl::Duration & startup_timeout)48   explicit PreStopHookServer(int port, const absl::Duration& startup_timeout)
49       : server_(BuildHookServer(&hook_service_, port)),
50         server_thread_(PreStopHookServer::ServerThread, this) {
51     WaitForState(State::kWaiting, startup_timeout);
52   }
53 
~PreStopHookServer()54   ~PreStopHookServer() {
55     hook_service_.Stop();
56     SetState(State::kShuttingDown);
57     server_->Shutdown();
58     WaitForState(State::kDone, absl::Seconds(5));
59     server_thread_.detach();
60   }
61 
GetState()62   State GetState() {
63     grpc_core::MutexLock lock(&mu_);
64     return state_;
65   }
66 
SetState(State state)67   void SetState(State state) {
68     grpc_core::MutexLock lock(&mu_);
69     state_ = state;
70     condition_.SignalAll();
71   }
72 
SetReturnStatus(const Status & status)73   void SetReturnStatus(const Status& status) {
74     hook_service_.AddReturnStatus(status);
75   }
76 
TestOnlyExpectRequests(size_t expected_requests_count,absl::Duration timeout)77   bool TestOnlyExpectRequests(size_t expected_requests_count,
78                               absl::Duration timeout) {
79     return hook_service_.TestOnlyExpectRequests(expected_requests_count,
80                                                 timeout);
81   }
82 
83  private:
WaitForState(State state,const absl::Duration & timeout)84   bool WaitForState(State state, const absl::Duration& timeout) {
85     grpc_core::MutexLock lock(&mu_);
86     auto deadline = absl::Now() + timeout;
87     while (state_ != state && !condition_.WaitWithDeadline(&mu_, deadline)) {
88     }
89     return state_ == state;
90   }
91 
ServerThread(PreStopHookServer * server)92   static void ServerThread(PreStopHookServer* server) {
93     server->SetState(State::kWaiting);
94     server->server_->Wait();
95     server->SetState(State::kDone);
96   }
97 
98   HookServiceImpl hook_service_;
99   grpc_core::Mutex mu_;
100   grpc_core::CondVar condition_ ABSL_GUARDED_BY(mu_);
101   State state_ ABSL_GUARDED_BY(mu_) = State::kNew;
102   std::unique_ptr<Server> server_;
103   std::thread server_thread_;
104 };
105 
Start(int port,size_t timeout_s)106 Status PreStopHookServerManager::Start(int port, size_t timeout_s) {
107   if (server_) {
108     return Status(StatusCode::ALREADY_EXISTS,
109                   "Pre hook server is already running");
110   }
111   server_ = std::unique_ptr<PreStopHookServer, PreStopHookServerDeleter>(
112       new PreStopHookServer(port, absl::Seconds(timeout_s)),
113       PreStopHookServerDeleter());
114   return server_->GetState() == State::kWaiting
115              ? Status::OK
116              : Status(StatusCode::DEADLINE_EXCEEDED, "Server have not started");
117 }
118 
Stop()119 Status PreStopHookServerManager::Stop() {
120   if (!server_) {
121     return Status(StatusCode::UNAVAILABLE, "Pre hook server is not running");
122   }
123   server_.reset();
124   return Status::OK;
125 }
126 
Return(StatusCode code,absl::string_view description)127 void PreStopHookServerManager::Return(StatusCode code,
128                                       absl::string_view description) {
129   server_->SetReturnStatus(Status(code, std::string(description)));
130 }
131 
TestOnlyExpectRequests(size_t expected_requests_count,const absl::Duration & timeout)132 bool PreStopHookServerManager::TestOnlyExpectRequests(
133     size_t expected_requests_count, const absl::Duration& timeout) {
134   return server_->TestOnlyExpectRequests(expected_requests_count, timeout);
135 }
136 
operator ()(PreStopHookServer * server)137 void PreStopHookServerManager::PreStopHookServerDeleter::operator()(
138     PreStopHookServer* server) {
139   delete server;
140 }
141 
142 //
143 // HookServiceImpl
144 //
145 
Hook(CallbackServerContext * context,const Empty *,Empty *)146 ServerUnaryReactor* HookServiceImpl::Hook(CallbackServerContext* context,
147                                           const Empty* /* request */,
148                                           Empty* /* reply */) {
149   auto reactor = context->DefaultReactor();
150   grpc_core::MutexLock lock(&mu_);
151   pending_requests_.emplace_back(reactor);
152   MatchRequestsAndStatuses();
153   return reactor;
154 }
155 
SetReturnStatus(CallbackServerContext * context,const SetReturnStatusRequest * request,Empty *)156 ServerUnaryReactor* HookServiceImpl::SetReturnStatus(
157     CallbackServerContext* context, const SetReturnStatusRequest* request,
158     Empty* /* reply */) {
159   auto reactor = context->DefaultReactor();
160   reactor->Finish(Status::OK);
161   grpc_core::MutexLock lock(&mu_);
162   respond_all_status_.emplace(
163       static_cast<StatusCode>(request->grpc_code_to_return()),
164       request->grpc_status_description());
165   MatchRequestsAndStatuses();
166   return reactor;
167 }
168 
ClearReturnStatus(CallbackServerContext * context,const Empty *,Empty *)169 ServerUnaryReactor* HookServiceImpl::ClearReturnStatus(
170     CallbackServerContext* context, const Empty* /* request */,
171     Empty* /* reply */) {
172   auto reactor = context->DefaultReactor();
173   reactor->Finish(Status::OK);
174   grpc_core::MutexLock lock(&mu_);
175   respond_all_status_.reset();
176   MatchRequestsAndStatuses();
177   return reactor;
178 }
179 
AddReturnStatus(const Status & status)180 void HookServiceImpl::AddReturnStatus(const Status& status) {
181   grpc_core::MutexLock lock(&mu_);
182   pending_statuses_.push_back(status);
183   MatchRequestsAndStatuses();
184 }
185 
TestOnlyExpectRequests(size_t expected_requests_count,const absl::Duration & timeout)186 bool HookServiceImpl::TestOnlyExpectRequests(size_t expected_requests_count,
187                                              const absl::Duration& timeout) {
188   grpc_core::MutexLock lock(&mu_);
189   auto deadline = absl::Now() + timeout;
190   while (pending_requests_.size() < expected_requests_count &&
191          !request_var_.WaitWithDeadline(&mu_, deadline)) {
192   }
193   return pending_requests_.size() >= expected_requests_count;
194 }
195 
Stop()196 void HookServiceImpl::Stop() {
197   grpc_core::MutexLock lock(&mu_);
198   if (!respond_all_status_.has_value()) {
199     respond_all_status_.emplace(StatusCode::ABORTED, "Shutting down");
200   }
201   MatchRequestsAndStatuses();
202 }
203 
MatchRequestsAndStatuses()204 void HookServiceImpl::MatchRequestsAndStatuses() {
205   while (!pending_requests_.empty() && !pending_statuses_.empty()) {
206     pending_requests_.front()->Finish(std::move(pending_statuses_.front()));
207     pending_requests_.erase(pending_requests_.begin());
208     pending_statuses_.erase(pending_statuses_.begin());
209   }
210   if (respond_all_status_.has_value()) {
211     for (const auto& request : pending_requests_) {
212       request->Finish(*respond_all_status_);
213     }
214     pending_requests_.clear();
215   }
216   request_var_.SignalAll();
217 }
218 
219 }  // namespace testing
220 }  // namespace grpc
221