1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "test/cpp/interop/xds_stats_watcher.h"
16
17 #include <map>
18
19 #include "absl/algorithm/container.h"
20 #include "absl/strings/ascii.h"
21
22 namespace grpc {
23 namespace testing {
24
25 namespace {
26
AddRpcMetadata(LoadBalancerStatsResponse::RpcMetadata * rpc_metadata,const std::unordered_set<std::string> & included_keys,bool include_all_keys,const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,LoadBalancerStatsResponse::MetadataType type)27 void AddRpcMetadata(
28 LoadBalancerStatsResponse::RpcMetadata* rpc_metadata,
29 const std::unordered_set<std::string>& included_keys, bool include_all_keys,
30 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
31 LoadBalancerStatsResponse::MetadataType type) {
32 for (const auto& key_value : metadata) {
33 absl::string_view key(key_value.first.data(), key_value.first.length());
34 if (include_all_keys ||
35 included_keys.find(absl::AsciiStrToLower(key)) != included_keys.end()) {
36 auto entry = rpc_metadata->add_metadata();
37 entry->set_key(key);
38 entry->set_value(absl::string_view(key_value.second.data(),
39 key_value.second.length()));
40 entry->set_type(type);
41 }
42 }
43 }
44
ToLowerCase(absl::Span<const std::string> strings)45 std::unordered_set<std::string> ToLowerCase(
46 absl::Span<const std::string> strings) {
47 std::unordered_set<std::string> result;
48 for (const auto& str : strings) {
49 result.emplace(absl::AsciiStrToLower(str));
50 }
51 return result;
52 }
53
HasNonEmptyMetadata(const std::map<std::string,LoadBalancerStatsResponse::MetadataByPeer> & metadata_by_peer)54 bool HasNonEmptyMetadata(
55 const std::map<std::string, LoadBalancerStatsResponse::MetadataByPeer>&
56 metadata_by_peer) {
57 for (const auto& entry : metadata_by_peer) {
58 for (const auto& rpc_metadata : entry.second.rpc_metadata()) {
59 if (rpc_metadata.metadata_size() > 0) {
60 return true;
61 }
62 }
63 }
64 return false;
65 }
66
67 } // namespace
68
XdsStatsWatcher(int start_id,int end_id,absl::Span<const std::string> metadata_keys)69 XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id,
70 absl::Span<const std::string> metadata_keys)
71 : start_id_(start_id),
72 end_id_(end_id),
73 rpcs_needed_(end_id - start_id),
74 metadata_keys_(ToLowerCase(metadata_keys)),
75 include_all_metadata_(
76 absl::c_any_of(metadata_keys, [](absl::string_view key) {
77 return absl::StripAsciiWhitespace(key) == "*";
78 })) {}
79
RpcCompleted(const AsyncClientCallResult & call,const std::string & peer,const std::multimap<grpc::string_ref,grpc::string_ref> & initial_metadata,const std::multimap<grpc::string_ref,grpc::string_ref> & trailing_metadata)80 void XdsStatsWatcher::RpcCompleted(
81 const AsyncClientCallResult& call, const std::string& peer,
82 const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata,
83 const std::multimap<grpc::string_ref, grpc::string_ref>&
84 trailing_metadata) {
85 // We count RPCs for global watcher or if the request_id falls into the
86 // watcher's interested range of request ids.
87 if ((start_id_ == 0 && end_id_ == 0) ||
88 (start_id_ <= call.saved_request_id && call.saved_request_id < end_id_)) {
89 {
90 std::lock_guard<std::mutex> lock(m_);
91 if (peer.empty()) {
92 no_remote_peer_++;
93 ++no_remote_peer_by_type_[call.rpc_type];
94 } else {
95 // RPC is counted into both per-peer bin and per-method-per-peer bin.
96 rpcs_by_peer_[peer]++;
97 rpcs_by_type_[call.rpc_type][peer]++;
98 auto* rpc_metadata = metadata_by_peer_[peer].add_rpc_metadata();
99 AddRpcMetadata(rpc_metadata, metadata_keys_, include_all_metadata_,
100 initial_metadata, LoadBalancerStatsResponse::INITIAL);
101 AddRpcMetadata(rpc_metadata, metadata_keys_, include_all_metadata_,
102 trailing_metadata, LoadBalancerStatsResponse::TRAILING);
103 }
104 rpcs_needed_--;
105 // Report accumulated stats.
106 auto& stats_per_method = *accumulated_stats_.mutable_stats_per_method();
107 auto& method_stat =
108 stats_per_method[ClientConfigureRequest_RpcType_Name(call.rpc_type)];
109 auto& result = *method_stat.mutable_result();
110 grpc_status_code code =
111 static_cast<grpc_status_code>(call.status.error_code());
112 auto& num_rpcs = result[code];
113 ++num_rpcs;
114 auto rpcs_started = method_stat.rpcs_started();
115 method_stat.set_rpcs_started(++rpcs_started);
116 }
117 cv_.notify_one();
118 }
119 }
120
WaitForRpcStatsResponse(int timeout_sec)121 LoadBalancerStatsResponse XdsStatsWatcher::WaitForRpcStatsResponse(
122 int timeout_sec) {
123 LoadBalancerStatsResponse response;
124 std::unique_lock<std::mutex> lock(m_);
125 cv_.wait_for(lock, std::chrono::seconds(timeout_sec),
126 [this] { return rpcs_needed_ == 0; });
127 response.mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
128 rpcs_by_peer_.end());
129 // Return metadata if at least one RPC had relevant metadata. Note that empty
130 // entries would be returned for RCPs with no relevant metadata in this case.
131 if (HasNonEmptyMetadata(metadata_by_peer_)) {
132 response.mutable_metadatas_by_peer()->insert(metadata_by_peer_.begin(),
133 metadata_by_peer_.end());
134 }
135 auto& response_rpcs_by_method = *response.mutable_rpcs_by_method();
136 for (const auto& rpc_by_type : rpcs_by_type_) {
137 std::string method_name;
138 if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) {
139 method_name = "EmptyCall";
140 } else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) {
141 method_name = "UnaryCall";
142 } else {
143 GPR_ASSERT(0);
144 }
145 // TODO(@donnadionne): When the test runner changes to accept EMPTY_CALL
146 // and UNARY_CALL we will just use the name of the enum instead of the
147 // method_name variable.
148 auto& response_rpc_by_method = response_rpcs_by_method[method_name];
149 auto& response_rpcs_by_peer =
150 *response_rpc_by_method.mutable_rpcs_by_peer();
151 for (const auto& rpc_by_peer : rpc_by_type.second) {
152 auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first];
153 response_rpc_by_peer = rpc_by_peer.second;
154 }
155 }
156 response.set_num_failures(no_remote_peer_ + rpcs_needed_);
157 return response;
158 }
159
GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse * response,StatsWatchers * stats_watchers)160 void XdsStatsWatcher::GetCurrentRpcStats(
161 LoadBalancerAccumulatedStatsResponse* response,
162 StatsWatchers* stats_watchers) {
163 std::unique_lock<std::mutex> lock(m_);
164 response->CopyFrom(accumulated_stats_);
165 // TODO(someone): delete deprecated stats below when the test is no
166 // longer using them.
167 // NOLINTBEGIN(clang-diagnostic-deprecated-declarations)
168 auto& response_rpcs_started_by_method =
169 *response->mutable_num_rpcs_started_by_method();
170 auto& response_rpcs_succeeded_by_method =
171 *response->mutable_num_rpcs_succeeded_by_method();
172 auto& response_rpcs_failed_by_method =
173 *response->mutable_num_rpcs_failed_by_method();
174 // NOLINTEND(clang-diagnostic-deprecated-declarations)
175 for (const auto& rpc_by_type : rpcs_by_type_) {
176 auto total_succeeded = 0;
177 for (const auto& rpc_by_peer : rpc_by_type.second) {
178 total_succeeded += rpc_by_peer.second;
179 }
180 response_rpcs_succeeded_by_method[ClientConfigureRequest_RpcType_Name(
181 rpc_by_type.first)] = total_succeeded;
182 response_rpcs_started_by_method[ClientConfigureRequest_RpcType_Name(
183 rpc_by_type.first)] =
184 stats_watchers->global_request_id_by_type[rpc_by_type.first];
185 response_rpcs_failed_by_method[ClientConfigureRequest_RpcType_Name(
186 rpc_by_type.first)] = no_remote_peer_by_type_[rpc_by_type.first];
187 }
188 }
189
190 } // namespace testing
191 } // namespace grpc
192