1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/cpp/server/health/default_health_check_service.h"
20
21 #include <stdint.h>
22
23 #include <memory>
24 #include <utility>
25
26 #include "upb/base/string_view.h"
27 #include "upb/upb.hpp"
28
29 #include <grpc/slice.h>
30 #include <grpc/support/log.h>
31 #include <grpcpp/impl/rpc_method.h>
32 #include <grpcpp/impl/rpc_service_method.h>
33 #include <grpcpp/impl/server_callback_handlers.h>
34 #include <grpcpp/support/slice.h>
35
36 #include "src/proto/grpc/health/v1/health.upb.h"
37
38 #define MAX_SERVICE_NAME_LENGTH 200
39
40 namespace grpc {
41
42 //
43 // DefaultHealthCheckService
44 //
45
DefaultHealthCheckService()46 DefaultHealthCheckService::DefaultHealthCheckService() {
47 services_map_[""].SetServingStatus(SERVING);
48 }
49
SetServingStatus(const std::string & service_name,bool serving)50 void DefaultHealthCheckService::SetServingStatus(
51 const std::string& service_name, bool serving) {
52 grpc::internal::MutexLock lock(&mu_);
53 if (shutdown_) {
54 // Set to NOT_SERVING in case service_name is not in the map.
55 serving = false;
56 }
57 services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
58 }
59
SetServingStatus(bool serving)60 void DefaultHealthCheckService::SetServingStatus(bool serving) {
61 const ServingStatus status = serving ? SERVING : NOT_SERVING;
62 grpc::internal::MutexLock lock(&mu_);
63 if (shutdown_) return;
64 for (auto& p : services_map_) {
65 ServiceData& service_data = p.second;
66 service_data.SetServingStatus(status);
67 }
68 }
69
Shutdown()70 void DefaultHealthCheckService::Shutdown() {
71 grpc::internal::MutexLock lock(&mu_);
72 if (shutdown_) return;
73 shutdown_ = true;
74 for (auto& p : services_map_) {
75 ServiceData& service_data = p.second;
76 service_data.SetServingStatus(NOT_SERVING);
77 }
78 }
79
80 DefaultHealthCheckService::ServingStatus
GetServingStatus(const std::string & service_name) const81 DefaultHealthCheckService::GetServingStatus(
82 const std::string& service_name) const {
83 grpc::internal::MutexLock lock(&mu_);
84 auto it = services_map_.find(service_name);
85 if (it == services_map_.end()) return NOT_FOUND;
86 const ServiceData& service_data = it->second;
87 return service_data.GetServingStatus();
88 }
89
RegisterWatch(const std::string & service_name,grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher)90 void DefaultHealthCheckService::RegisterWatch(
91 const std::string& service_name,
92 grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
93 grpc::internal::MutexLock lock(&mu_);
94 ServiceData& service_data = services_map_[service_name];
95 watcher->SendHealth(service_data.GetServingStatus());
96 service_data.AddWatch(std::move(watcher));
97 }
98
UnregisterWatch(const std::string & service_name,HealthCheckServiceImpl::WatchReactor * watcher)99 void DefaultHealthCheckService::UnregisterWatch(
100 const std::string& service_name,
101 HealthCheckServiceImpl::WatchReactor* watcher) {
102 grpc::internal::MutexLock lock(&mu_);
103 auto it = services_map_.find(service_name);
104 if (it == services_map_.end()) return;
105 ServiceData& service_data = it->second;
106 service_data.RemoveWatch(watcher);
107 if (service_data.Unused()) services_map_.erase(it);
108 }
109
110 DefaultHealthCheckService::HealthCheckServiceImpl*
GetHealthCheckService()111 DefaultHealthCheckService::GetHealthCheckService() {
112 GPR_ASSERT(impl_ == nullptr);
113 impl_ = std::make_unique<HealthCheckServiceImpl>(this);
114 return impl_.get();
115 }
116
117 //
118 // DefaultHealthCheckService::ServiceData
119 //
120
SetServingStatus(ServingStatus status)121 void DefaultHealthCheckService::ServiceData::SetServingStatus(
122 ServingStatus status) {
123 status_ = status;
124 for (const auto& p : watchers_) {
125 p.first->SendHealth(status);
126 }
127 }
128
AddWatch(grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher)129 void DefaultHealthCheckService::ServiceData::AddWatch(
130 grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
131 watchers_[watcher.get()] = std::move(watcher);
132 }
133
RemoveWatch(HealthCheckServiceImpl::WatchReactor * watcher)134 void DefaultHealthCheckService::ServiceData::RemoveWatch(
135 HealthCheckServiceImpl::WatchReactor* watcher) {
136 watchers_.erase(watcher);
137 }
138
139 //
140 // DefaultHealthCheckService::HealthCheckServiceImpl
141 //
142
143 namespace {
144 const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
145 const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
146 } // namespace
147
HealthCheckServiceImpl(DefaultHealthCheckService * database)148 DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
149 DefaultHealthCheckService* database)
150 : database_(database) {
151 // Add Check() method.
152 AddMethod(new internal::RpcServiceMethod(
153 kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
154 MarkMethodCallback(
155 0, new internal::CallbackUnaryHandler<ByteBuffer, ByteBuffer>(
156 [database](CallbackServerContext* context,
157 const ByteBuffer* request, ByteBuffer* response) {
158 return HandleCheckRequest(database, context, request, response);
159 }));
160 // Add Watch() method.
161 AddMethod(new internal::RpcServiceMethod(
162 kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
163 MarkMethodCallback(
164 1, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>(
165 [this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) {
166 return new WatchReactor(this, request);
167 }));
168 }
169
~HealthCheckServiceImpl()170 DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
171 grpc::internal::MutexLock lock(&mu_);
172 shutdown_ = true;
173 while (num_watches_ > 0) {
174 shutdown_condition_.Wait(&mu_);
175 }
176 }
177
178 ServerUnaryReactor*
HandleCheckRequest(DefaultHealthCheckService * database,CallbackServerContext * context,const ByteBuffer * request,ByteBuffer * response)179 DefaultHealthCheckService::HealthCheckServiceImpl::HandleCheckRequest(
180 DefaultHealthCheckService* database, CallbackServerContext* context,
181 const ByteBuffer* request, ByteBuffer* response) {
182 auto* reactor = context->DefaultReactor();
183 std::string service_name;
184 if (!DecodeRequest(*request, &service_name)) {
185 reactor->Finish(
186 Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
187 return reactor;
188 }
189 ServingStatus serving_status = database->GetServingStatus(service_name);
190 if (serving_status == NOT_FOUND) {
191 reactor->Finish(Status(StatusCode::NOT_FOUND, "service name unknown"));
192 return reactor;
193 }
194 if (!EncodeResponse(serving_status, response)) {
195 reactor->Finish(Status(StatusCode::INTERNAL, "could not encode response"));
196 return reactor;
197 }
198 reactor->Finish(Status::OK);
199 return reactor;
200 }
201
DecodeRequest(const ByteBuffer & request,std::string * service_name)202 bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
203 const ByteBuffer& request, std::string* service_name) {
204 Slice slice;
205 if (!request.DumpToSingleSlice(&slice).ok()) return false;
206 uint8_t* request_bytes = nullptr;
207 size_t request_size = 0;
208 request_bytes = const_cast<uint8_t*>(slice.begin());
209 request_size = slice.size();
210 upb::Arena arena;
211 grpc_health_v1_HealthCheckRequest* request_struct =
212 grpc_health_v1_HealthCheckRequest_parse(
213 reinterpret_cast<char*>(request_bytes), request_size, arena.ptr());
214 if (request_struct == nullptr) {
215 return false;
216 }
217 upb_StringView service =
218 grpc_health_v1_HealthCheckRequest_service(request_struct);
219 if (service.size > MAX_SERVICE_NAME_LENGTH) {
220 return false;
221 }
222 service_name->assign(service.data, service.size);
223 return true;
224 }
225
EncodeResponse(ServingStatus status,ByteBuffer * response)226 bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
227 ServingStatus status, ByteBuffer* response) {
228 upb::Arena arena;
229 grpc_health_v1_HealthCheckResponse* response_struct =
230 grpc_health_v1_HealthCheckResponse_new(arena.ptr());
231 grpc_health_v1_HealthCheckResponse_set_status(
232 response_struct,
233 status == NOT_FOUND ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN
234 : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING
235 : grpc_health_v1_HealthCheckResponse_NOT_SERVING);
236 size_t buf_length;
237 char* buf = grpc_health_v1_HealthCheckResponse_serialize(
238 response_struct, arena.ptr(), &buf_length);
239 if (buf == nullptr) {
240 return false;
241 }
242 grpc_slice response_slice = grpc_slice_from_copied_buffer(buf, buf_length);
243 Slice encoded_response(response_slice, Slice::STEAL_REF);
244 ByteBuffer response_buffer(&encoded_response, 1);
245 response->Swap(&response_buffer);
246 return true;
247 }
248
249 //
250 // DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor
251 //
252
WatchReactor(HealthCheckServiceImpl * service,const ByteBuffer * request)253 DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::WatchReactor(
254 HealthCheckServiceImpl* service, const ByteBuffer* request)
255 : service_(service) {
256 {
257 grpc::internal::MutexLock lock(&service_->mu_);
258 ++service_->num_watches_;
259 }
260 bool success = DecodeRequest(*request, &service_name_);
261 gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": watch call started", service_,
262 this, service_name_.c_str());
263 if (!success) {
264 MaybeFinishLocked(Status(StatusCode::INTERNAL, "could not parse request"));
265 return;
266 }
267 // Register the call for updates to the service.
268 service_->database_->RegisterWatch(service_name_, Ref());
269 }
270
271 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealth(ServingStatus status)272 SendHealth(ServingStatus status) {
273 gpr_log(GPR_DEBUG,
274 "[HCS %p] watcher %p \"%s\": SendHealth() for ServingStatus %d",
275 service_, this, service_name_.c_str(), status);
276 grpc::internal::MutexLock lock(&mu_);
277 // If there's already a send in flight, cache the new status, and
278 // we'll start a new send for it when the one in flight completes.
279 if (write_pending_) {
280 gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": queuing write", service_,
281 this, service_name_.c_str());
282 pending_status_ = status;
283 return;
284 }
285 // Start a send.
286 SendHealthLocked(status);
287 }
288
289 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealthLocked(ServingStatus status)290 SendHealthLocked(ServingStatus status) {
291 // Do nothing if Finish() has already been called.
292 if (finish_called_) return;
293 // Check if we're shutting down.
294 {
295 grpc::internal::MutexLock lock(&service_->mu_);
296 if (service_->shutdown_) {
297 MaybeFinishLocked(
298 Status(StatusCode::CANCELLED, "not writing due to shutdown"));
299 return;
300 }
301 }
302 // Send response.
303 bool success = EncodeResponse(status, &response_);
304 if (!success) {
305 MaybeFinishLocked(
306 Status(StatusCode::INTERNAL, "could not encode response"));
307 return;
308 }
309 gpr_log(GPR_DEBUG,
310 "[HCS %p] watcher %p \"%s\": starting write for ServingStatus %d",
311 service_, this, service_name_.c_str(), status);
312 write_pending_ = true;
313 StartWrite(&response_);
314 }
315
316 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnWriteDone(bool ok)317 OnWriteDone(bool ok) {
318 gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": OnWriteDone(): ok=%d",
319 service_, this, service_name_.c_str(), ok);
320 response_.Clear();
321 grpc::internal::MutexLock lock(&mu_);
322 if (!ok) {
323 MaybeFinishLocked(Status(StatusCode::CANCELLED, "OnWriteDone() ok=false"));
324 return;
325 }
326 write_pending_ = false;
327 // If we got a new status since we started the last send, start a
328 // new send for it.
329 if (pending_status_ != NOT_FOUND) {
330 auto status = pending_status_;
331 pending_status_ = NOT_FOUND;
332 SendHealthLocked(status);
333 }
334 }
335
336 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnCancel()337 OnCancel() {
338 grpc::internal::MutexLock lock(&mu_);
339 MaybeFinishLocked(Status(StatusCode::UNKNOWN, "OnCancel()"));
340 }
341
OnDone()342 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::OnDone() {
343 gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": OnDone()", service_, this,
344 service_name_.c_str());
345 service_->database_->UnregisterWatch(service_name_, this);
346 {
347 grpc::internal::MutexLock lock(&service_->mu_);
348 if (--service_->num_watches_ == 0 && service_->shutdown_) {
349 service_->shutdown_condition_.Signal();
350 }
351 }
352 // Free the initial ref from instantiation.
353 Unref();
354 }
355
356 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
MaybeFinishLocked(Status status)357 MaybeFinishLocked(Status status) {
358 gpr_log(GPR_DEBUG,
359 "[HCS %p] watcher %p \"%s\": MaybeFinishLocked() with code=%d msg=%s",
360 service_, this, service_name_.c_str(), status.error_code(),
361 status.error_message().c_str());
362 if (!finish_called_) {
363 gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": actually calling Finish()",
364 service_, this, service_name_.c_str());
365 finish_called_ = true;
366 Finish(status);
367 }
368 }
369
370 } // namespace grpc
371