1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/cpp/server/health/default_health_check_service.h"
20 
21 #include <stdint.h>
22 
23 #include <memory>
24 #include <utility>
25 
26 #include "upb/base/string_view.h"
27 #include "upb/upb.hpp"
28 
29 #include <grpc/slice.h>
30 #include <grpc/support/log.h>
31 #include <grpcpp/impl/rpc_method.h>
32 #include <grpcpp/impl/rpc_service_method.h>
33 #include <grpcpp/impl/server_callback_handlers.h>
34 #include <grpcpp/support/slice.h>
35 
36 #include "src/proto/grpc/health/v1/health.upb.h"
37 
38 #define MAX_SERVICE_NAME_LENGTH 200
39 
40 namespace grpc {
41 
42 //
43 // DefaultHealthCheckService
44 //
45 
DefaultHealthCheckService()46 DefaultHealthCheckService::DefaultHealthCheckService() {
47   services_map_[""].SetServingStatus(SERVING);
48 }
49 
SetServingStatus(const std::string & service_name,bool serving)50 void DefaultHealthCheckService::SetServingStatus(
51     const std::string& service_name, bool serving) {
52   grpc::internal::MutexLock lock(&mu_);
53   if (shutdown_) {
54     // Set to NOT_SERVING in case service_name is not in the map.
55     serving = false;
56   }
57   services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
58 }
59 
SetServingStatus(bool serving)60 void DefaultHealthCheckService::SetServingStatus(bool serving) {
61   const ServingStatus status = serving ? SERVING : NOT_SERVING;
62   grpc::internal::MutexLock lock(&mu_);
63   if (shutdown_) return;
64   for (auto& p : services_map_) {
65     ServiceData& service_data = p.second;
66     service_data.SetServingStatus(status);
67   }
68 }
69 
Shutdown()70 void DefaultHealthCheckService::Shutdown() {
71   grpc::internal::MutexLock lock(&mu_);
72   if (shutdown_) return;
73   shutdown_ = true;
74   for (auto& p : services_map_) {
75     ServiceData& service_data = p.second;
76     service_data.SetServingStatus(NOT_SERVING);
77   }
78 }
79 
80 DefaultHealthCheckService::ServingStatus
GetServingStatus(const std::string & service_name) const81 DefaultHealthCheckService::GetServingStatus(
82     const std::string& service_name) const {
83   grpc::internal::MutexLock lock(&mu_);
84   auto it = services_map_.find(service_name);
85   if (it == services_map_.end()) return NOT_FOUND;
86   const ServiceData& service_data = it->second;
87   return service_data.GetServingStatus();
88 }
89 
RegisterWatch(const std::string & service_name,grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher)90 void DefaultHealthCheckService::RegisterWatch(
91     const std::string& service_name,
92     grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
93   grpc::internal::MutexLock lock(&mu_);
94   ServiceData& service_data = services_map_[service_name];
95   watcher->SendHealth(service_data.GetServingStatus());
96   service_data.AddWatch(std::move(watcher));
97 }
98 
UnregisterWatch(const std::string & service_name,HealthCheckServiceImpl::WatchReactor * watcher)99 void DefaultHealthCheckService::UnregisterWatch(
100     const std::string& service_name,
101     HealthCheckServiceImpl::WatchReactor* watcher) {
102   grpc::internal::MutexLock lock(&mu_);
103   auto it = services_map_.find(service_name);
104   if (it == services_map_.end()) return;
105   ServiceData& service_data = it->second;
106   service_data.RemoveWatch(watcher);
107   if (service_data.Unused()) services_map_.erase(it);
108 }
109 
110 DefaultHealthCheckService::HealthCheckServiceImpl*
GetHealthCheckService()111 DefaultHealthCheckService::GetHealthCheckService() {
112   GPR_ASSERT(impl_ == nullptr);
113   impl_ = std::make_unique<HealthCheckServiceImpl>(this);
114   return impl_.get();
115 }
116 
117 //
118 // DefaultHealthCheckService::ServiceData
119 //
120 
SetServingStatus(ServingStatus status)121 void DefaultHealthCheckService::ServiceData::SetServingStatus(
122     ServingStatus status) {
123   status_ = status;
124   for (const auto& p : watchers_) {
125     p.first->SendHealth(status);
126   }
127 }
128 
AddWatch(grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher)129 void DefaultHealthCheckService::ServiceData::AddWatch(
130     grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
131   watchers_[watcher.get()] = std::move(watcher);
132 }
133 
RemoveWatch(HealthCheckServiceImpl::WatchReactor * watcher)134 void DefaultHealthCheckService::ServiceData::RemoveWatch(
135     HealthCheckServiceImpl::WatchReactor* watcher) {
136   watchers_.erase(watcher);
137 }
138 
139 //
140 // DefaultHealthCheckService::HealthCheckServiceImpl
141 //
142 
143 namespace {
144 const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
145 const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
146 }  // namespace
147 
HealthCheckServiceImpl(DefaultHealthCheckService * database)148 DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
149     DefaultHealthCheckService* database)
150     : database_(database) {
151   // Add Check() method.
152   AddMethod(new internal::RpcServiceMethod(
153       kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
154   MarkMethodCallback(
155       0, new internal::CallbackUnaryHandler<ByteBuffer, ByteBuffer>(
156              [database](CallbackServerContext* context,
157                         const ByteBuffer* request, ByteBuffer* response) {
158                return HandleCheckRequest(database, context, request, response);
159              }));
160   // Add Watch() method.
161   AddMethod(new internal::RpcServiceMethod(
162       kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
163   MarkMethodCallback(
164       1, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>(
165              [this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) {
166                return new WatchReactor(this, request);
167              }));
168 }
169 
~HealthCheckServiceImpl()170 DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
171   grpc::internal::MutexLock lock(&mu_);
172   shutdown_ = true;
173   while (num_watches_ > 0) {
174     shutdown_condition_.Wait(&mu_);
175   }
176 }
177 
178 ServerUnaryReactor*
HandleCheckRequest(DefaultHealthCheckService * database,CallbackServerContext * context,const ByteBuffer * request,ByteBuffer * response)179 DefaultHealthCheckService::HealthCheckServiceImpl::HandleCheckRequest(
180     DefaultHealthCheckService* database, CallbackServerContext* context,
181     const ByteBuffer* request, ByteBuffer* response) {
182   auto* reactor = context->DefaultReactor();
183   std::string service_name;
184   if (!DecodeRequest(*request, &service_name)) {
185     reactor->Finish(
186         Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
187     return reactor;
188   }
189   ServingStatus serving_status = database->GetServingStatus(service_name);
190   if (serving_status == NOT_FOUND) {
191     reactor->Finish(Status(StatusCode::NOT_FOUND, "service name unknown"));
192     return reactor;
193   }
194   if (!EncodeResponse(serving_status, response)) {
195     reactor->Finish(Status(StatusCode::INTERNAL, "could not encode response"));
196     return reactor;
197   }
198   reactor->Finish(Status::OK);
199   return reactor;
200 }
201 
DecodeRequest(const ByteBuffer & request,std::string * service_name)202 bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
203     const ByteBuffer& request, std::string* service_name) {
204   Slice slice;
205   if (!request.DumpToSingleSlice(&slice).ok()) return false;
206   uint8_t* request_bytes = nullptr;
207   size_t request_size = 0;
208   request_bytes = const_cast<uint8_t*>(slice.begin());
209   request_size = slice.size();
210   upb::Arena arena;
211   grpc_health_v1_HealthCheckRequest* request_struct =
212       grpc_health_v1_HealthCheckRequest_parse(
213           reinterpret_cast<char*>(request_bytes), request_size, arena.ptr());
214   if (request_struct == nullptr) {
215     return false;
216   }
217   upb_StringView service =
218       grpc_health_v1_HealthCheckRequest_service(request_struct);
219   if (service.size > MAX_SERVICE_NAME_LENGTH) {
220     return false;
221   }
222   service_name->assign(service.data, service.size);
223   return true;
224 }
225 
EncodeResponse(ServingStatus status,ByteBuffer * response)226 bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
227     ServingStatus status, ByteBuffer* response) {
228   upb::Arena arena;
229   grpc_health_v1_HealthCheckResponse* response_struct =
230       grpc_health_v1_HealthCheckResponse_new(arena.ptr());
231   grpc_health_v1_HealthCheckResponse_set_status(
232       response_struct,
233       status == NOT_FOUND ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN
234       : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING
235                           : grpc_health_v1_HealthCheckResponse_NOT_SERVING);
236   size_t buf_length;
237   char* buf = grpc_health_v1_HealthCheckResponse_serialize(
238       response_struct, arena.ptr(), &buf_length);
239   if (buf == nullptr) {
240     return false;
241   }
242   grpc_slice response_slice = grpc_slice_from_copied_buffer(buf, buf_length);
243   Slice encoded_response(response_slice, Slice::STEAL_REF);
244   ByteBuffer response_buffer(&encoded_response, 1);
245   response->Swap(&response_buffer);
246   return true;
247 }
248 
249 //
250 // DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor
251 //
252 
WatchReactor(HealthCheckServiceImpl * service,const ByteBuffer * request)253 DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::WatchReactor(
254     HealthCheckServiceImpl* service, const ByteBuffer* request)
255     : service_(service) {
256   {
257     grpc::internal::MutexLock lock(&service_->mu_);
258     ++service_->num_watches_;
259   }
260   bool success = DecodeRequest(*request, &service_name_);
261   gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": watch call started", service_,
262           this, service_name_.c_str());
263   if (!success) {
264     MaybeFinishLocked(Status(StatusCode::INTERNAL, "could not parse request"));
265     return;
266   }
267   // Register the call for updates to the service.
268   service_->database_->RegisterWatch(service_name_, Ref());
269 }
270 
271 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealth(ServingStatus status)272     SendHealth(ServingStatus status) {
273   gpr_log(GPR_DEBUG,
274           "[HCS %p] watcher %p \"%s\": SendHealth() for ServingStatus %d",
275           service_, this, service_name_.c_str(), status);
276   grpc::internal::MutexLock lock(&mu_);
277   // If there's already a send in flight, cache the new status, and
278   // we'll start a new send for it when the one in flight completes.
279   if (write_pending_) {
280     gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": queuing write", service_,
281             this, service_name_.c_str());
282     pending_status_ = status;
283     return;
284   }
285   // Start a send.
286   SendHealthLocked(status);
287 }
288 
289 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealthLocked(ServingStatus status)290     SendHealthLocked(ServingStatus status) {
291   // Do nothing if Finish() has already been called.
292   if (finish_called_) return;
293   // Check if we're shutting down.
294   {
295     grpc::internal::MutexLock lock(&service_->mu_);
296     if (service_->shutdown_) {
297       MaybeFinishLocked(
298           Status(StatusCode::CANCELLED, "not writing due to shutdown"));
299       return;
300     }
301   }
302   // Send response.
303   bool success = EncodeResponse(status, &response_);
304   if (!success) {
305     MaybeFinishLocked(
306         Status(StatusCode::INTERNAL, "could not encode response"));
307     return;
308   }
309   gpr_log(GPR_DEBUG,
310           "[HCS %p] watcher %p \"%s\": starting write for ServingStatus %d",
311           service_, this, service_name_.c_str(), status);
312   write_pending_ = true;
313   StartWrite(&response_);
314 }
315 
316 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnWriteDone(bool ok)317     OnWriteDone(bool ok) {
318   gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": OnWriteDone(): ok=%d",
319           service_, this, service_name_.c_str(), ok);
320   response_.Clear();
321   grpc::internal::MutexLock lock(&mu_);
322   if (!ok) {
323     MaybeFinishLocked(Status(StatusCode::CANCELLED, "OnWriteDone() ok=false"));
324     return;
325   }
326   write_pending_ = false;
327   // If we got a new status since we started the last send, start a
328   // new send for it.
329   if (pending_status_ != NOT_FOUND) {
330     auto status = pending_status_;
331     pending_status_ = NOT_FOUND;
332     SendHealthLocked(status);
333   }
334 }
335 
336 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnCancel()337     OnCancel() {
338   grpc::internal::MutexLock lock(&mu_);
339   MaybeFinishLocked(Status(StatusCode::UNKNOWN, "OnCancel()"));
340 }
341 
OnDone()342 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::OnDone() {
343   gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": OnDone()", service_, this,
344           service_name_.c_str());
345   service_->database_->UnregisterWatch(service_name_, this);
346   {
347     grpc::internal::MutexLock lock(&service_->mu_);
348     if (--service_->num_watches_ == 0 && service_->shutdown_) {
349       service_->shutdown_condition_.Signal();
350     }
351   }
352   // Free the initial ref from instantiation.
353   Unref();
354 }
355 
356 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
MaybeFinishLocked(Status status)357     MaybeFinishLocked(Status status) {
358   gpr_log(GPR_DEBUG,
359           "[HCS %p] watcher %p \"%s\": MaybeFinishLocked() with code=%d msg=%s",
360           service_, this, service_name_.c_str(), status.error_code(),
361           status.error_message().c_str());
362   if (!finish_called_) {
363     gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": actually calling Finish()",
364             service_, this, service_name_.c_str());
365     finish_called_ = true;
366     Finish(status);
367   }
368 }
369 
370 }  // namespace grpc
371