1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/cpp/server/load_reporter/load_data_store.h"
22 
23 #include <stdint.h>
24 #include <stdio.h>
25 
26 #include <cstdlib>
27 #include <iterator>
28 #include <set>
29 #include <unordered_map>
30 
31 #include <grpc/support/log.h>
32 
33 #include "src/core/lib/iomgr/socket_utils.h"
34 #include "src/cpp/server/load_reporter/constants.h"
35 
36 namespace grpc {
37 namespace load_reporter {
38 
39 // Some helper functions.
40 namespace {
41 
42 // Given a map from type K to a set of value type V, finds the set associated
43 // with the given key and erases the value from the set. If the set becomes
44 // empty, also erases the key-set pair. Returns true if the value is erased
45 // successfully.
46 template <typename K, typename V>
UnorderedMapOfSetEraseKeyValue(std::unordered_map<K,std::set<V>> & map,const K & key,const V & value)47 bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>& map,
48                                     const K& key, const V& value) {
49   auto it = map.find(key);
50   if (it != map.end()) {
51     size_t erased = it->second.erase(value);
52     if (it->second.empty()) {
53       map.erase(it);
54     }
55     return erased;
56   }
57   return false;
58 };
59 
60 // Given a map from type K to a set of value type V, removes the given key and
61 // the associated set, and returns the set. Returns an empty set if the key is
62 // not found.
63 template <typename K, typename V>
UnorderedMapOfSetExtract(std::unordered_map<K,std::set<V>> & map,const K & key)64 std::set<V> UnorderedMapOfSetExtract(std::unordered_map<K, std::set<V>>& map,
65                                      const K& key) {
66   auto it = map.find(key);
67   if (it != map.end()) {
68     auto set = std::move(it->second);
69     map.erase(it);
70     return set;
71   }
72   return {};
73 };
74 
75 // From a non-empty container, returns a pointer to a random element.
76 template <typename C>
RandomElement(const C & container)77 const typename C::value_type* RandomElement(const C& container) {
78   GPR_ASSERT(!container.empty());
79   auto it = container.begin();
80   std::advance(it, std::rand() % container.size());
81   return &(*it);
82 }
83 
84 }  // namespace
85 
LoadRecordKey(const std::string & client_ip_and_token,std::string user_id)86 LoadRecordKey::LoadRecordKey(const std::string& client_ip_and_token,
87                              std::string user_id)
88     : user_id_(std::move(user_id)) {
89   GPR_ASSERT(client_ip_and_token.size() >= 2);
90   int ip_hex_size;
91   GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d",
92                     &ip_hex_size) == 1);
93   GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
94              ip_hex_size == kIpv6AddressLength);
95   size_t cur_pos = 2;
96   client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
97   cur_pos += ip_hex_size;
98   if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
99     lb_id_ = kInvalidLbId;
100     lb_tag_ = "";
101   } else {
102     lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
103     lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
104   }
105 }
106 
GetClientIpBytes() const107 std::string LoadRecordKey::GetClientIpBytes() const {
108   if (client_ip_hex_.empty()) {
109     return "";
110   } else if (client_ip_hex_.size() == kIpv4AddressLength) {
111     uint32_t ip_bytes;
112     if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
113       gpr_log(GPR_ERROR,
114               "Can't parse client IP (%s) from a hex string to an integer.",
115               client_ip_hex_.c_str());
116       return "";
117     }
118     ip_bytes = grpc_htonl(ip_bytes);
119     return std::string(reinterpret_cast<const char*>(&ip_bytes),
120                        sizeof(ip_bytes));
121   } else if (client_ip_hex_.size() == kIpv6AddressLength) {
122     uint32_t ip_bytes[4];
123     for (size_t i = 0; i < 4; ++i) {
124       if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
125                  ip_bytes + i) != 1) {
126         gpr_log(
127             GPR_ERROR,
128             "Can't parse client IP part (%s) from a hex string to an integer.",
129             client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str());
130         return "";
131       }
132       ip_bytes[i] = grpc_htonl(ip_bytes[i]);
133     }
134     return std::string(reinterpret_cast<const char*>(ip_bytes),
135                        sizeof(ip_bytes));
136   } else {
137     GPR_UNREACHABLE_CODE(return "");
138   }
139 }
140 
LoadRecordValue(std::string metric_name,uint64_t num_calls,double total_metric_value)141 LoadRecordValue::LoadRecordValue(std::string metric_name, uint64_t num_calls,
142                                  double total_metric_value) {
143   call_metrics_.emplace(std::move(metric_name),
144                         CallMetricValue(num_calls, total_metric_value));
145 }
146 
MergeRow(const LoadRecordKey & key,const LoadRecordValue & value)147 void PerBalancerStore::MergeRow(const LoadRecordKey& key,
148                                 const LoadRecordValue& value) {
149   // During suspension, the load data received will be dropped.
150   if (!suspended_) {
151     load_record_map_[key].MergeFrom(value);
152     gpr_log(GPR_DEBUG,
153             "[PerBalancerStore %p] Load data merged (Key: %s, Value: %s).",
154             this, key.ToString().c_str(), value.ToString().c_str());
155   } else {
156     gpr_log(GPR_DEBUG,
157             "[PerBalancerStore %p] Load data dropped (Key: %s, Value: %s).",
158             this, key.ToString().c_str(), value.ToString().c_str());
159   }
160   // We always keep track of num_calls_in_progress_, so that when this
161   // store is resumed, we still have a correct value of
162   // num_calls_in_progress_.
163   GPR_ASSERT(static_cast<int64_t>(num_calls_in_progress_) +
164                  value.GetNumCallsInProgressDelta() >=
165              0);
166   num_calls_in_progress_ += value.GetNumCallsInProgressDelta();
167 }
168 
Suspend()169 void PerBalancerStore::Suspend() {
170   suspended_ = true;
171   load_record_map_.clear();
172   gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Suspended.", this);
173 }
174 
Resume()175 void PerBalancerStore::Resume() {
176   suspended_ = false;
177   gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Resumed.", this);
178 }
179 
GetNumCallsInProgressForReport()180 uint64_t PerBalancerStore::GetNumCallsInProgressForReport() {
181   GPR_ASSERT(!suspended_);
182   last_reported_num_calls_in_progress_ = num_calls_in_progress_;
183   return num_calls_in_progress_;
184 }
185 
ReportStreamCreated(const std::string & lb_id,const std::string & load_key)186 void PerHostStore::ReportStreamCreated(const std::string& lb_id,
187                                        const std::string& load_key) {
188   GPR_ASSERT(lb_id != kInvalidLbId);
189   SetUpForNewLbId(lb_id, load_key);
190   // Prior to this one, there was no load balancer receiving report, so we may
191   // have unassigned orphaned stores to assign to this new balancer.
192   // TODO(juanlishen): If the load key of this new stream is the same with
193   // some previously adopted orphan store, we may want to take the orphan to
194   // this stream. Need to discuss with LB team.
195   if (assigned_stores_.size() == 1) {
196     for (const auto& p : per_balancer_stores_) {
197       const std::string& other_lb_id = p.first;
198       const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second;
199       if (other_lb_id != lb_id) {
200         orphaned_store->Resume();
201         AssignOrphanedStore(orphaned_store.get(), lb_id);
202       }
203     }
204   }
205   // The first connected balancer will adopt the kInvalidLbId.
206   if (per_balancer_stores_.size() == 1) {
207     SetUpForNewLbId(kInvalidLbId, "");
208     ReportStreamClosed(kInvalidLbId);
209   }
210 }
211 
ReportStreamClosed(const std::string & lb_id)212 void PerHostStore::ReportStreamClosed(const std::string& lb_id) {
213   auto it_store_for_gone_lb = per_balancer_stores_.find(lb_id);
214   GPR_ASSERT(it_store_for_gone_lb != per_balancer_stores_.end());
215   // Remove this closed stream from our records.
216   GPR_ASSERT(UnorderedMapOfSetEraseKeyValue(
217       load_key_to_receiving_lb_ids_, it_store_for_gone_lb->second->load_key(),
218       lb_id));
219   std::set<PerBalancerStore*> orphaned_stores =
220       UnorderedMapOfSetExtract(assigned_stores_, lb_id);
221   // The stores that were assigned to this balancer are orphaned now. They
222   // should be re-assigned to other balancers which are still receiving reports.
223   for (PerBalancerStore* orphaned_store : orphaned_stores) {
224     const std::string* new_receiver = nullptr;
225     auto it = load_key_to_receiving_lb_ids_.find(orphaned_store->load_key());
226     if (it != load_key_to_receiving_lb_ids_.end()) {
227       // First, try to pick from the active balancers with the same load key.
228       new_receiver = RandomElement(it->second);
229     } else if (!assigned_stores_.empty()) {
230       // If failed, pick from all the remaining active balancers.
231       new_receiver = &(RandomElement(assigned_stores_)->first);
232     }
233     if (new_receiver != nullptr) {
234       AssignOrphanedStore(orphaned_store, *new_receiver);
235     } else {
236       // Load data for an LB ID that can't be assigned to any stream should
237       // be dropped.
238       orphaned_store->Suspend();
239     }
240   }
241 }
242 
FindPerBalancerStore(const std::string & lb_id) const243 PerBalancerStore* PerHostStore::FindPerBalancerStore(
244     const std::string& lb_id) const {
245   return per_balancer_stores_.find(lb_id) != per_balancer_stores_.end()
246              ? per_balancer_stores_.find(lb_id)->second.get()
247              : nullptr;
248 }
249 
GetAssignedStores(const std::string & lb_id) const250 const std::set<PerBalancerStore*>* PerHostStore::GetAssignedStores(
251     const std::string& lb_id) const {
252   auto it = assigned_stores_.find(lb_id);
253   if (it == assigned_stores_.end()) return nullptr;
254   return &(it->second);
255 }
256 
AssignOrphanedStore(PerBalancerStore * orphaned_store,const std::string & new_receiver)257 void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store,
258                                        const std::string& new_receiver) {
259   auto it = assigned_stores_.find(new_receiver);
260   GPR_ASSERT(it != assigned_stores_.end());
261   it->second.insert(orphaned_store);
262   gpr_log(GPR_INFO,
263           "[PerHostStore %p] Re-assigned orphaned store (%p) with original LB"
264           " ID of %s to new receiver %s",
265           this, orphaned_store, orphaned_store->lb_id().c_str(),
266           new_receiver.c_str());
267 }
268 
SetUpForNewLbId(const std::string & lb_id,const std::string & load_key)269 void PerHostStore::SetUpForNewLbId(const std::string& lb_id,
270                                    const std::string& load_key) {
271   // The top-level caller (i.e., LoadReportService) should guarantee the
272   // lb_id is unique for each reporting stream.
273   GPR_ASSERT(per_balancer_stores_.find(lb_id) == per_balancer_stores_.end());
274   GPR_ASSERT(assigned_stores_.find(lb_id) == assigned_stores_.end());
275   load_key_to_receiving_lb_ids_[load_key].insert(lb_id);
276   std::unique_ptr<PerBalancerStore> per_balancer_store(
277       new PerBalancerStore(lb_id, load_key));
278   assigned_stores_[lb_id] = {per_balancer_store.get()};
279   per_balancer_stores_[lb_id] = std::move(per_balancer_store);
280 }
281 
FindPerBalancerStore(const string & hostname,const string & lb_id) const282 PerBalancerStore* LoadDataStore::FindPerBalancerStore(
283     const string& hostname, const string& lb_id) const {
284   auto it = per_host_stores_.find(hostname);
285   if (it != per_host_stores_.end()) {
286     const PerHostStore& per_host_store = it->second;
287     return per_host_store.FindPerBalancerStore(lb_id);
288   } else {
289     return nullptr;
290   }
291 }
292 
MergeRow(const std::string & hostname,const LoadRecordKey & key,const LoadRecordValue & value)293 void LoadDataStore::MergeRow(const std::string& hostname,
294                              const LoadRecordKey& key,
295                              const LoadRecordValue& value) {
296   PerBalancerStore* per_balancer_store =
297       FindPerBalancerStore(hostname, key.lb_id());
298   if (per_balancer_store != nullptr) {
299     per_balancer_store->MergeRow(key, value);
300     return;
301   }
302   // Unknown LB ID. Track it until its number of in-progress calls drops to
303   // zero.
304   int64_t in_progress_delta = value.GetNumCallsInProgressDelta();
305   if (in_progress_delta != 0) {
306     auto it_tracker = unknown_balancer_id_trackers_.find(key.lb_id());
307     if (it_tracker == unknown_balancer_id_trackers_.end()) {
308       gpr_log(
309           GPR_DEBUG,
310           "[LoadDataStore %p] Start tracking unknown balancer (lb_id_: %s).",
311           this, key.lb_id().c_str());
312       unknown_balancer_id_trackers_.insert(
313           {key.lb_id(), static_cast<uint64_t>(in_progress_delta)});
314     } else if ((it_tracker->second += in_progress_delta) == 0) {
315       unknown_balancer_id_trackers_.erase(it_tracker);
316       gpr_log(GPR_DEBUG,
317               "[LoadDataStore %p] Stop tracking unknown balancer (lb_id_: %s).",
318               this, key.lb_id().c_str());
319     }
320   }
321 }
322 
GetAssignedStores(const std::string & hostname,const std::string & lb_id)323 const std::set<PerBalancerStore*>* LoadDataStore::GetAssignedStores(
324     const std::string& hostname, const std::string& lb_id) {
325   auto it = per_host_stores_.find(hostname);
326   if (it == per_host_stores_.end()) return nullptr;
327   return it->second.GetAssignedStores(lb_id);
328 }
329 
ReportStreamCreated(const std::string & hostname,const std::string & lb_id,const std::string & load_key)330 void LoadDataStore::ReportStreamCreated(const std::string& hostname,
331                                         const std::string& lb_id,
332                                         const std::string& load_key) {
333   per_host_stores_[hostname].ReportStreamCreated(lb_id, load_key);
334 }
335 
ReportStreamClosed(const std::string & hostname,const std::string & lb_id)336 void LoadDataStore::ReportStreamClosed(const std::string& hostname,
337                                        const std::string& lb_id) {
338   auto it_per_host_store = per_host_stores_.find(hostname);
339   GPR_ASSERT(it_per_host_store != per_host_stores_.end());
340   it_per_host_store->second.ReportStreamClosed(lb_id);
341 }
342 
343 }  // namespace load_reporter
344 }  // namespace grpc
345