1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/channel/channelz_registry.h"
22 
23 #include <algorithm>
24 #include <cstdint>
25 #include <cstring>
26 #include <utility>
27 #include <vector>
28 
29 #include <grpc/grpc.h>
30 #include <grpc/support/json.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
33 
34 #include "src/core/lib/channel/channelz.h"
35 #include "src/core/lib/gprpp/sync.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/lib/json/json.h"
38 #include "src/core/lib/json/json_writer.h"
39 
40 namespace grpc_core {
41 namespace channelz {
42 namespace {
43 
44 const int kPaginationLimit = 100;
45 
46 }  // anonymous namespace
47 
Default()48 ChannelzRegistry* ChannelzRegistry::Default() {
49   static ChannelzRegistry* singleton = new ChannelzRegistry();
50   return singleton;
51 }
52 
InternalRegister(BaseNode * node)53 void ChannelzRegistry::InternalRegister(BaseNode* node) {
54   MutexLock lock(&mu_);
55   node->uuid_ = ++uuid_generator_;
56   node_map_[node->uuid_] = node;
57 }
58 
InternalUnregister(intptr_t uuid)59 void ChannelzRegistry::InternalUnregister(intptr_t uuid) {
60   GPR_ASSERT(uuid >= 1);
61   MutexLock lock(&mu_);
62   GPR_ASSERT(uuid <= uuid_generator_);
63   node_map_.erase(uuid);
64 }
65 
InternalGet(intptr_t uuid)66 RefCountedPtr<BaseNode> ChannelzRegistry::InternalGet(intptr_t uuid) {
67   MutexLock lock(&mu_);
68   if (uuid < 1 || uuid > uuid_generator_) {
69     return nullptr;
70   }
71   auto it = node_map_.find(uuid);
72   if (it == node_map_.end()) return nullptr;
73   // Found node.  Return only if its refcount is not zero (i.e., when we
74   // know that there is no other thread about to destroy it).
75   BaseNode* node = it->second;
76   return node->RefIfNonZero();
77 }
78 
InternalGetTopChannels(intptr_t start_channel_id)79 std::string ChannelzRegistry::InternalGetTopChannels(
80     intptr_t start_channel_id) {
81   std::vector<RefCountedPtr<BaseNode>> top_level_channels;
82   RefCountedPtr<BaseNode> node_after_pagination_limit;
83   {
84     MutexLock lock(&mu_);
85     for (auto it = node_map_.lower_bound(start_channel_id);
86          it != node_map_.end(); ++it) {
87       BaseNode* node = it->second;
88       RefCountedPtr<BaseNode> node_ref;
89       if (node->type() == BaseNode::EntityType::kTopLevelChannel &&
90           (node_ref = node->RefIfNonZero()) != nullptr) {
91         // Check if we are over pagination limit to determine if we need to set
92         // the "end" element. If we don't go through this block, we know that
93         // when the loop terminates, we have <= to kPaginationLimit.
94         // Note that because we have already increased this node's
95         // refcount, we need to decrease it, but we can't unref while
96         // holding the lock, because this may lead to a deadlock.
97         if (top_level_channels.size() == kPaginationLimit) {
98           node_after_pagination_limit = std::move(node_ref);
99           break;
100         }
101         top_level_channels.emplace_back(std::move(node_ref));
102       }
103     }
104   }
105   Json::Object object;
106   if (!top_level_channels.empty()) {
107     // Create list of channels.
108     Json::Array array;
109     for (size_t i = 0; i < top_level_channels.size(); ++i) {
110       array.emplace_back(top_level_channels[i]->RenderJson());
111     }
112     object["channel"] = Json::FromArray(std::move(array));
113   }
114   if (node_after_pagination_limit == nullptr) {
115     object["end"] = Json::FromBool(true);
116   }
117   return JsonDump(Json::FromObject(std::move(object)));
118 }
119 
InternalGetServers(intptr_t start_server_id)120 std::string ChannelzRegistry::InternalGetServers(intptr_t start_server_id) {
121   std::vector<RefCountedPtr<BaseNode>> servers;
122   RefCountedPtr<BaseNode> node_after_pagination_limit;
123   {
124     MutexLock lock(&mu_);
125     for (auto it = node_map_.lower_bound(start_server_id);
126          it != node_map_.end(); ++it) {
127       BaseNode* node = it->second;
128       RefCountedPtr<BaseNode> node_ref;
129       if (node->type() == BaseNode::EntityType::kServer &&
130           (node_ref = node->RefIfNonZero()) != nullptr) {
131         // Check if we are over pagination limit to determine if we need to set
132         // the "end" element. If we don't go through this block, we know that
133         // when the loop terminates, we have <= to kPaginationLimit.
134         // Note that because we have already increased this node's
135         // refcount, we need to decrease it, but we can't unref while
136         // holding the lock, because this may lead to a deadlock.
137         if (servers.size() == kPaginationLimit) {
138           node_after_pagination_limit = std::move(node_ref);
139           break;
140         }
141         servers.emplace_back(std::move(node_ref));
142       }
143     }
144   }
145   Json::Object object;
146   if (!servers.empty()) {
147     // Create list of servers.
148     Json::Array array;
149     for (size_t i = 0; i < servers.size(); ++i) {
150       array.emplace_back(servers[i]->RenderJson());
151     }
152     object["server"] = Json::FromArray(std::move(array));
153   }
154   if (node_after_pagination_limit == nullptr) {
155     object["end"] = Json::FromBool(true);
156   }
157   return JsonDump(Json::FromObject(std::move(object)));
158 }
159 
InternalLogAllEntities()160 void ChannelzRegistry::InternalLogAllEntities() {
161   std::vector<RefCountedPtr<BaseNode>> nodes;
162   {
163     MutexLock lock(&mu_);
164     for (auto& p : node_map_) {
165       RefCountedPtr<BaseNode> node = p.second->RefIfNonZero();
166       if (node != nullptr) {
167         nodes.emplace_back(std::move(node));
168       }
169     }
170   }
171   for (size_t i = 0; i < nodes.size(); ++i) {
172     std::string json = nodes[i]->RenderJsonString();
173     gpr_log(GPR_INFO, "%s", json.c_str());
174   }
175 }
176 
177 }  // namespace channelz
178 }  // namespace grpc_core
179 
grpc_channelz_get_top_channels(intptr_t start_channel_id)180 char* grpc_channelz_get_top_channels(intptr_t start_channel_id) {
181   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
182   grpc_core::ExecCtx exec_ctx;
183   return gpr_strdup(
184       grpc_core::channelz::ChannelzRegistry::GetTopChannels(start_channel_id)
185           .c_str());
186 }
187 
grpc_channelz_get_servers(intptr_t start_server_id)188 char* grpc_channelz_get_servers(intptr_t start_server_id) {
189   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
190   grpc_core::ExecCtx exec_ctx;
191   return gpr_strdup(
192       grpc_core::channelz::ChannelzRegistry::GetServers(start_server_id)
193           .c_str());
194 }
195 
grpc_channelz_get_server(intptr_t server_id)196 char* grpc_channelz_get_server(intptr_t server_id) {
197   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
198   grpc_core::ExecCtx exec_ctx;
199   grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> server_node =
200       grpc_core::channelz::ChannelzRegistry::Get(server_id);
201   if (server_node == nullptr ||
202       server_node->type() !=
203           grpc_core::channelz::BaseNode::EntityType::kServer) {
204     return nullptr;
205   }
206   grpc_core::Json json = grpc_core::Json::FromObject({
207       {"server", server_node->RenderJson()},
208   });
209   return gpr_strdup(grpc_core::JsonDump(json).c_str());
210 }
211 
grpc_channelz_get_server_sockets(intptr_t server_id,intptr_t start_socket_id,intptr_t max_results)212 char* grpc_channelz_get_server_sockets(intptr_t server_id,
213                                        intptr_t start_socket_id,
214                                        intptr_t max_results) {
215   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
216   grpc_core::ExecCtx exec_ctx;
217   // Validate inputs before handing them of to the renderer.
218   grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> base_node =
219       grpc_core::channelz::ChannelzRegistry::Get(server_id);
220   if (base_node == nullptr ||
221       base_node->type() != grpc_core::channelz::BaseNode::EntityType::kServer ||
222       start_socket_id < 0 || max_results < 0) {
223     return nullptr;
224   }
225   // This cast is ok since we have just checked to make sure base_node is
226   // actually a server node.
227   grpc_core::channelz::ServerNode* server_node =
228       static_cast<grpc_core::channelz::ServerNode*>(base_node.get());
229   return gpr_strdup(
230       server_node->RenderServerSockets(start_socket_id, max_results).c_str());
231 }
232 
grpc_channelz_get_channel(intptr_t channel_id)233 char* grpc_channelz_get_channel(intptr_t channel_id) {
234   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
235   grpc_core::ExecCtx exec_ctx;
236   grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> channel_node =
237       grpc_core::channelz::ChannelzRegistry::Get(channel_id);
238   if (channel_node == nullptr ||
239       (channel_node->type() !=
240            grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel &&
241        channel_node->type() !=
242            grpc_core::channelz::BaseNode::EntityType::kInternalChannel)) {
243     return nullptr;
244   }
245   grpc_core::Json json = grpc_core::Json::FromObject({
246       {"channel", channel_node->RenderJson()},
247   });
248   return gpr_strdup(grpc_core::JsonDump(json).c_str());
249 }
250 
grpc_channelz_get_subchannel(intptr_t subchannel_id)251 char* grpc_channelz_get_subchannel(intptr_t subchannel_id) {
252   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
253   grpc_core::ExecCtx exec_ctx;
254   grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> subchannel_node =
255       grpc_core::channelz::ChannelzRegistry::Get(subchannel_id);
256   if (subchannel_node == nullptr ||
257       subchannel_node->type() !=
258           grpc_core::channelz::BaseNode::EntityType::kSubchannel) {
259     return nullptr;
260   }
261   grpc_core::Json json = grpc_core::Json::FromObject({
262       {"subchannel", subchannel_node->RenderJson()},
263   });
264   return gpr_strdup(grpc_core::JsonDump(json).c_str());
265 }
266 
grpc_channelz_get_socket(intptr_t socket_id)267 char* grpc_channelz_get_socket(intptr_t socket_id) {
268   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
269   grpc_core::ExecCtx exec_ctx;
270   grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> socket_node =
271       grpc_core::channelz::ChannelzRegistry::Get(socket_id);
272   if (socket_node == nullptr ||
273       socket_node->type() !=
274           grpc_core::channelz::BaseNode::EntityType::kSocket) {
275     return nullptr;
276   }
277   grpc_core::Json json = grpc_core::Json::FromObject({
278       {"socket", socket_node->RenderJson()},
279   });
280   return gpr_strdup(grpc_core::JsonDump(json).c_str());
281 }
282