xref: /aosp_15_r20/external/grpc-grpc/test/cpp/interop/xds_stats_watcher.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "test/cpp/interop/xds_stats_watcher.h"
16 
17 #include <map>
18 
19 #include "absl/algorithm/container.h"
20 #include "absl/strings/ascii.h"
21 
22 namespace grpc {
23 namespace testing {
24 
25 namespace {
26 
AddRpcMetadata(LoadBalancerStatsResponse::RpcMetadata * rpc_metadata,const std::unordered_set<std::string> & included_keys,bool include_all_keys,const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,LoadBalancerStatsResponse::MetadataType type)27 void AddRpcMetadata(
28     LoadBalancerStatsResponse::RpcMetadata* rpc_metadata,
29     const std::unordered_set<std::string>& included_keys, bool include_all_keys,
30     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
31     LoadBalancerStatsResponse::MetadataType type) {
32   for (const auto& key_value : metadata) {
33     absl::string_view key(key_value.first.data(), key_value.first.length());
34     if (include_all_keys ||
35         included_keys.find(absl::AsciiStrToLower(key)) != included_keys.end()) {
36       auto entry = rpc_metadata->add_metadata();
37       entry->set_key(key);
38       entry->set_value(absl::string_view(key_value.second.data(),
39                                          key_value.second.length()));
40       entry->set_type(type);
41     }
42   }
43 }
44 
ToLowerCase(absl::Span<const std::string> strings)45 std::unordered_set<std::string> ToLowerCase(
46     absl::Span<const std::string> strings) {
47   std::unordered_set<std::string> result;
48   for (const auto& str : strings) {
49     result.emplace(absl::AsciiStrToLower(str));
50   }
51   return result;
52 }
53 
HasNonEmptyMetadata(const std::map<std::string,LoadBalancerStatsResponse::MetadataByPeer> & metadata_by_peer)54 bool HasNonEmptyMetadata(
55     const std::map<std::string, LoadBalancerStatsResponse::MetadataByPeer>&
56         metadata_by_peer) {
57   for (const auto& entry : metadata_by_peer) {
58     for (const auto& rpc_metadata : entry.second.rpc_metadata()) {
59       if (rpc_metadata.metadata_size() > 0) {
60         return true;
61       }
62     }
63   }
64   return false;
65 }
66 
67 }  // namespace
68 
XdsStatsWatcher(int start_id,int end_id,absl::Span<const std::string> metadata_keys)69 XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id,
70                                  absl::Span<const std::string> metadata_keys)
71     : start_id_(start_id),
72       end_id_(end_id),
73       rpcs_needed_(end_id - start_id),
74       metadata_keys_(ToLowerCase(metadata_keys)),
75       include_all_metadata_(
76           absl::c_any_of(metadata_keys, [](absl::string_view key) {
77             return absl::StripAsciiWhitespace(key) == "*";
78           })) {}
79 
RpcCompleted(const AsyncClientCallResult & call,const std::string & peer,const std::multimap<grpc::string_ref,grpc::string_ref> & initial_metadata,const std::multimap<grpc::string_ref,grpc::string_ref> & trailing_metadata)80 void XdsStatsWatcher::RpcCompleted(
81     const AsyncClientCallResult& call, const std::string& peer,
82     const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata,
83     const std::multimap<grpc::string_ref, grpc::string_ref>&
84         trailing_metadata) {
85   // We count RPCs for global watcher or if the request_id falls into the
86   // watcher's interested range of request ids.
87   if ((start_id_ == 0 && end_id_ == 0) ||
88       (start_id_ <= call.saved_request_id && call.saved_request_id < end_id_)) {
89     {
90       std::lock_guard<std::mutex> lock(m_);
91       if (peer.empty()) {
92         no_remote_peer_++;
93         ++no_remote_peer_by_type_[call.rpc_type];
94       } else {
95         // RPC is counted into both per-peer bin and per-method-per-peer bin.
96         rpcs_by_peer_[peer]++;
97         rpcs_by_type_[call.rpc_type][peer]++;
98         auto* rpc_metadata = metadata_by_peer_[peer].add_rpc_metadata();
99         AddRpcMetadata(rpc_metadata, metadata_keys_, include_all_metadata_,
100                        initial_metadata, LoadBalancerStatsResponse::INITIAL);
101         AddRpcMetadata(rpc_metadata, metadata_keys_, include_all_metadata_,
102                        trailing_metadata, LoadBalancerStatsResponse::TRAILING);
103       }
104       rpcs_needed_--;
105       // Report accumulated stats.
106       auto& stats_per_method = *accumulated_stats_.mutable_stats_per_method();
107       auto& method_stat =
108           stats_per_method[ClientConfigureRequest_RpcType_Name(call.rpc_type)];
109       auto& result = *method_stat.mutable_result();
110       grpc_status_code code =
111           static_cast<grpc_status_code>(call.status.error_code());
112       auto& num_rpcs = result[code];
113       ++num_rpcs;
114       auto rpcs_started = method_stat.rpcs_started();
115       method_stat.set_rpcs_started(++rpcs_started);
116     }
117     cv_.notify_one();
118   }
119 }
120 
WaitForRpcStatsResponse(int timeout_sec)121 LoadBalancerStatsResponse XdsStatsWatcher::WaitForRpcStatsResponse(
122     int timeout_sec) {
123   LoadBalancerStatsResponse response;
124   std::unique_lock<std::mutex> lock(m_);
125   cv_.wait_for(lock, std::chrono::seconds(timeout_sec),
126                [this] { return rpcs_needed_ == 0; });
127   response.mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
128                                           rpcs_by_peer_.end());
129   // Return metadata if at least one RPC had relevant metadata. Note that empty
130   // entries would be returned for RCPs with no relevant metadata in this case.
131   if (HasNonEmptyMetadata(metadata_by_peer_)) {
132     response.mutable_metadatas_by_peer()->insert(metadata_by_peer_.begin(),
133                                                  metadata_by_peer_.end());
134   }
135   auto& response_rpcs_by_method = *response.mutable_rpcs_by_method();
136   for (const auto& rpc_by_type : rpcs_by_type_) {
137     std::string method_name;
138     if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) {
139       method_name = "EmptyCall";
140     } else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) {
141       method_name = "UnaryCall";
142     } else {
143       GPR_ASSERT(0);
144     }
145     // TODO(@donnadionne): When the test runner changes to accept EMPTY_CALL
146     // and UNARY_CALL we will just use the name of the enum instead of the
147     // method_name variable.
148     auto& response_rpc_by_method = response_rpcs_by_method[method_name];
149     auto& response_rpcs_by_peer =
150         *response_rpc_by_method.mutable_rpcs_by_peer();
151     for (const auto& rpc_by_peer : rpc_by_type.second) {
152       auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first];
153       response_rpc_by_peer = rpc_by_peer.second;
154     }
155   }
156   response.set_num_failures(no_remote_peer_ + rpcs_needed_);
157   return response;
158 }
159 
GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse * response,StatsWatchers * stats_watchers)160 void XdsStatsWatcher::GetCurrentRpcStats(
161     LoadBalancerAccumulatedStatsResponse* response,
162     StatsWatchers* stats_watchers) {
163   std::unique_lock<std::mutex> lock(m_);
164   response->CopyFrom(accumulated_stats_);
165   // TODO(someone): delete deprecated stats below when the test is no
166   // longer using them.
167   // NOLINTBEGIN(clang-diagnostic-deprecated-declarations)
168   auto& response_rpcs_started_by_method =
169       *response->mutable_num_rpcs_started_by_method();
170   auto& response_rpcs_succeeded_by_method =
171       *response->mutable_num_rpcs_succeeded_by_method();
172   auto& response_rpcs_failed_by_method =
173       *response->mutable_num_rpcs_failed_by_method();
174   // NOLINTEND(clang-diagnostic-deprecated-declarations)
175   for (const auto& rpc_by_type : rpcs_by_type_) {
176     auto total_succeeded = 0;
177     for (const auto& rpc_by_peer : rpc_by_type.second) {
178       total_succeeded += rpc_by_peer.second;
179     }
180     response_rpcs_succeeded_by_method[ClientConfigureRequest_RpcType_Name(
181         rpc_by_type.first)] = total_succeeded;
182     response_rpcs_started_by_method[ClientConfigureRequest_RpcType_Name(
183         rpc_by_type.first)] =
184         stats_watchers->global_request_id_by_type[rpc_by_type.first];
185     response_rpcs_failed_by_method[ClientConfigureRequest_RpcType_Name(
186         rpc_by_type.first)] = no_remote_peer_by_type_[rpc_by_type.first];
187   }
188 }
189 
190 }  // namespace testing
191 }  // namespace grpc
192