1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/interop/pre_stop_hook_server.h"
20
21 #include <thread>
22
23 #include "absl/strings/str_format.h"
24
25 #include <grpcpp/grpcpp.h>
26
27 #include "src/core/lib/gprpp/sync.h"
28 #include "src/proto/grpc/testing/messages.pb.h"
29
30 namespace grpc {
31 namespace testing {
32 namespace {
33
34 enum class State : std::uint8_t { kNew, kWaiting, kDone, kShuttingDown };
35
BuildHookServer(HookServiceImpl * service,int port)36 std::unique_ptr<Server> BuildHookServer(HookServiceImpl* service, int port) {
37 ServerBuilder builder;
38 builder.AddListeningPort(absl::StrFormat("0.0.0.0:%d", port),
39 grpc::InsecureServerCredentials());
40 builder.RegisterService(service);
41 return builder.BuildAndStart();
42 }
43
44 } // namespace
45
46 class PreStopHookServer {
47 public:
PreStopHookServer(int port,const absl::Duration & startup_timeout)48 explicit PreStopHookServer(int port, const absl::Duration& startup_timeout)
49 : server_(BuildHookServer(&hook_service_, port)),
50 server_thread_(PreStopHookServer::ServerThread, this) {
51 WaitForState(State::kWaiting, startup_timeout);
52 }
53
~PreStopHookServer()54 ~PreStopHookServer() {
55 hook_service_.Stop();
56 SetState(State::kShuttingDown);
57 server_->Shutdown();
58 WaitForState(State::kDone, absl::Seconds(5));
59 server_thread_.detach();
60 }
61
GetState()62 State GetState() {
63 grpc_core::MutexLock lock(&mu_);
64 return state_;
65 }
66
SetState(State state)67 void SetState(State state) {
68 grpc_core::MutexLock lock(&mu_);
69 state_ = state;
70 condition_.SignalAll();
71 }
72
SetReturnStatus(const Status & status)73 void SetReturnStatus(const Status& status) {
74 hook_service_.AddReturnStatus(status);
75 }
76
TestOnlyExpectRequests(size_t expected_requests_count,absl::Duration timeout)77 bool TestOnlyExpectRequests(size_t expected_requests_count,
78 absl::Duration timeout) {
79 return hook_service_.TestOnlyExpectRequests(expected_requests_count,
80 timeout);
81 }
82
83 private:
WaitForState(State state,const absl::Duration & timeout)84 bool WaitForState(State state, const absl::Duration& timeout) {
85 grpc_core::MutexLock lock(&mu_);
86 auto deadline = absl::Now() + timeout;
87 while (state_ != state && !condition_.WaitWithDeadline(&mu_, deadline)) {
88 }
89 return state_ == state;
90 }
91
ServerThread(PreStopHookServer * server)92 static void ServerThread(PreStopHookServer* server) {
93 server->SetState(State::kWaiting);
94 server->server_->Wait();
95 server->SetState(State::kDone);
96 }
97
98 HookServiceImpl hook_service_;
99 grpc_core::Mutex mu_;
100 grpc_core::CondVar condition_ ABSL_GUARDED_BY(mu_);
101 State state_ ABSL_GUARDED_BY(mu_) = State::kNew;
102 std::unique_ptr<Server> server_;
103 std::thread server_thread_;
104 };
105
Start(int port,size_t timeout_s)106 Status PreStopHookServerManager::Start(int port, size_t timeout_s) {
107 if (server_) {
108 return Status(StatusCode::ALREADY_EXISTS,
109 "Pre hook server is already running");
110 }
111 server_ = std::unique_ptr<PreStopHookServer, PreStopHookServerDeleter>(
112 new PreStopHookServer(port, absl::Seconds(timeout_s)),
113 PreStopHookServerDeleter());
114 return server_->GetState() == State::kWaiting
115 ? Status::OK
116 : Status(StatusCode::DEADLINE_EXCEEDED, "Server have not started");
117 }
118
Stop()119 Status PreStopHookServerManager::Stop() {
120 if (!server_) {
121 return Status(StatusCode::UNAVAILABLE, "Pre hook server is not running");
122 }
123 server_.reset();
124 return Status::OK;
125 }
126
Return(StatusCode code,absl::string_view description)127 void PreStopHookServerManager::Return(StatusCode code,
128 absl::string_view description) {
129 server_->SetReturnStatus(Status(code, std::string(description)));
130 }
131
TestOnlyExpectRequests(size_t expected_requests_count,const absl::Duration & timeout)132 bool PreStopHookServerManager::TestOnlyExpectRequests(
133 size_t expected_requests_count, const absl::Duration& timeout) {
134 return server_->TestOnlyExpectRequests(expected_requests_count, timeout);
135 }
136
operator ()(PreStopHookServer * server)137 void PreStopHookServerManager::PreStopHookServerDeleter::operator()(
138 PreStopHookServer* server) {
139 delete server;
140 }
141
142 //
143 // HookServiceImpl
144 //
145
Hook(CallbackServerContext * context,const Empty *,Empty *)146 ServerUnaryReactor* HookServiceImpl::Hook(CallbackServerContext* context,
147 const Empty* /* request */,
148 Empty* /* reply */) {
149 auto reactor = context->DefaultReactor();
150 grpc_core::MutexLock lock(&mu_);
151 pending_requests_.emplace_back(reactor);
152 MatchRequestsAndStatuses();
153 return reactor;
154 }
155
SetReturnStatus(CallbackServerContext * context,const SetReturnStatusRequest * request,Empty *)156 ServerUnaryReactor* HookServiceImpl::SetReturnStatus(
157 CallbackServerContext* context, const SetReturnStatusRequest* request,
158 Empty* /* reply */) {
159 auto reactor = context->DefaultReactor();
160 reactor->Finish(Status::OK);
161 grpc_core::MutexLock lock(&mu_);
162 respond_all_status_.emplace(
163 static_cast<StatusCode>(request->grpc_code_to_return()),
164 request->grpc_status_description());
165 MatchRequestsAndStatuses();
166 return reactor;
167 }
168
ClearReturnStatus(CallbackServerContext * context,const Empty *,Empty *)169 ServerUnaryReactor* HookServiceImpl::ClearReturnStatus(
170 CallbackServerContext* context, const Empty* /* request */,
171 Empty* /* reply */) {
172 auto reactor = context->DefaultReactor();
173 reactor->Finish(Status::OK);
174 grpc_core::MutexLock lock(&mu_);
175 respond_all_status_.reset();
176 MatchRequestsAndStatuses();
177 return reactor;
178 }
179
AddReturnStatus(const Status & status)180 void HookServiceImpl::AddReturnStatus(const Status& status) {
181 grpc_core::MutexLock lock(&mu_);
182 pending_statuses_.push_back(status);
183 MatchRequestsAndStatuses();
184 }
185
TestOnlyExpectRequests(size_t expected_requests_count,const absl::Duration & timeout)186 bool HookServiceImpl::TestOnlyExpectRequests(size_t expected_requests_count,
187 const absl::Duration& timeout) {
188 grpc_core::MutexLock lock(&mu_);
189 auto deadline = absl::Now() + timeout;
190 while (pending_requests_.size() < expected_requests_count &&
191 !request_var_.WaitWithDeadline(&mu_, deadline)) {
192 }
193 return pending_requests_.size() >= expected_requests_count;
194 }
195
Stop()196 void HookServiceImpl::Stop() {
197 grpc_core::MutexLock lock(&mu_);
198 if (!respond_all_status_.has_value()) {
199 respond_all_status_.emplace(StatusCode::ABORTED, "Shutting down");
200 }
201 MatchRequestsAndStatuses();
202 }
203
MatchRequestsAndStatuses()204 void HookServiceImpl::MatchRequestsAndStatuses() {
205 while (!pending_requests_.empty() && !pending_statuses_.empty()) {
206 pending_requests_.front()->Finish(std::move(pending_statuses_.front()));
207 pending_requests_.erase(pending_requests_.begin());
208 pending_statuses_.erase(pending_statuses_.begin());
209 }
210 if (respond_all_status_.has_value()) {
211 for (const auto& request : pending_requests_) {
212 request->Finish(*respond_all_status_);
213 }
214 pending_requests_.clear();
215 }
216 request_var_.SignalAll();
217 }
218
219 } // namespace testing
220 } // namespace grpc
221