1 // 2 // 3 // Copyright 2023 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_TEST_CPP_INTEROP_XDS_STATS_WATCHER_H 20 #define GRPC_TEST_CPP_INTEROP_XDS_STATS_WATCHER_H 21 22 #include <atomic> 23 #include <chrono> 24 #include <condition_variable> 25 #include <deque> 26 #include <map> 27 #include <mutex> 28 #include <set> 29 #include <sstream> 30 #include <string> 31 #include <thread> 32 #include <unordered_set> 33 #include <vector> 34 35 #include "absl/status/status.h" 36 #include "absl/types/span.h" 37 38 #include <grpcpp/grpcpp.h> 39 40 #include "src/proto/grpc/testing/empty.pb.h" 41 #include "src/proto/grpc/testing/messages.pb.h" 42 43 namespace grpc { 44 namespace testing { 45 46 class XdsStatsWatcher; 47 48 struct AsyncClientCallResult { 49 Empty empty_response; 50 SimpleResponse simple_response; 51 Status status; 52 int saved_request_id; 53 ClientConfigureRequest::RpcType rpc_type; 54 }; 55 56 struct StatsWatchers { 57 // Unique ID for each outgoing RPC 58 int global_request_id = 0; 59 // Unique ID for each outgoing RPC by RPC method type 60 std::map<int, int> global_request_id_by_type; 61 // Stores a set of watchers that should be notified upon outgoing RPC 62 // completion 63 std::set<XdsStatsWatcher*> watchers; 64 // Global watcher for accumululated stats. 65 XdsStatsWatcher* global_watcher; 66 // Mutex for global_request_id and watchers 67 std::mutex mu; 68 }; 69 70 /// Records the remote peer distribution for a given range of RPCs. 71 class XdsStatsWatcher { 72 public: 73 XdsStatsWatcher(int start_id, int end_id, 74 absl::Span<const std::string> metadata_keys); 75 76 // Upon the completion of an RPC, we will look at the request_id, the 77 // rpc_type, and the peer the RPC was sent to in order to count 78 // this RPC into the right stats bin. 79 void RpcCompleted( 80 const AsyncClientCallResult& call, const std::string& peer, 81 const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata, 82 const std::multimap<grpc::string_ref, grpc::string_ref>& 83 trailing_metadata); 84 85 LoadBalancerStatsResponse WaitForRpcStatsResponse(int timeout_sec); 86 87 void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response, 88 StatsWatchers* stats_watchers); 89 90 private: 91 int start_id_; 92 int end_id_; 93 int rpcs_needed_; 94 int no_remote_peer_ = 0; 95 std::map<int, int> no_remote_peer_by_type_; 96 // A map of stats keyed by peer name. 97 std::map<std::string, int> rpcs_by_peer_; 98 // A two-level map of stats keyed at top level by RPC method and second level 99 // by peer name. 100 std::map<int, std::map<std::string, int>> rpcs_by_type_; 101 // Storing accumulated stats in the response proto format. 102 LoadBalancerAccumulatedStatsResponse accumulated_stats_; 103 std::mutex m_; 104 std::condition_variable cv_; 105 std::unordered_set<std::string> metadata_keys_; 106 bool include_all_metadata_ = false; 107 std::map<std::string, LoadBalancerStatsResponse::MetadataByPeer> 108 metadata_by_peer_; 109 }; 110 111 } // namespace testing 112 } // namespace grpc 113 114 #endif // GRPC_TEST_CPP_INTEROP_XDS_STATS_WATCHER_H 115