xref: /aosp_15_r20/external/grpc-grpc/test/cpp/interop/xds_stats_watcher.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_TEST_CPP_INTEROP_XDS_STATS_WATCHER_H
20 #define GRPC_TEST_CPP_INTEROP_XDS_STATS_WATCHER_H
21 
22 #include <atomic>
23 #include <chrono>
24 #include <condition_variable>
25 #include <deque>
26 #include <map>
27 #include <mutex>
28 #include <set>
29 #include <sstream>
30 #include <string>
31 #include <thread>
32 #include <unordered_set>
33 #include <vector>
34 
35 #include "absl/status/status.h"
36 #include "absl/types/span.h"
37 
38 #include <grpcpp/grpcpp.h>
39 
40 #include "src/proto/grpc/testing/empty.pb.h"
41 #include "src/proto/grpc/testing/messages.pb.h"
42 
43 namespace grpc {
44 namespace testing {
45 
46 class XdsStatsWatcher;
47 
48 struct AsyncClientCallResult {
49   Empty empty_response;
50   SimpleResponse simple_response;
51   Status status;
52   int saved_request_id;
53   ClientConfigureRequest::RpcType rpc_type;
54 };
55 
56 struct StatsWatchers {
57   // Unique ID for each outgoing RPC
58   int global_request_id = 0;
59   // Unique ID for each outgoing RPC by RPC method type
60   std::map<int, int> global_request_id_by_type;
61   // Stores a set of watchers that should be notified upon outgoing RPC
62   // completion
63   std::set<XdsStatsWatcher*> watchers;
64   // Global watcher for accumululated stats.
65   XdsStatsWatcher* global_watcher;
66   // Mutex for global_request_id and watchers
67   std::mutex mu;
68 };
69 
70 /// Records the remote peer distribution for a given range of RPCs.
71 class XdsStatsWatcher {
72  public:
73   XdsStatsWatcher(int start_id, int end_id,
74                   absl::Span<const std::string> metadata_keys);
75 
76   // Upon the completion of an RPC, we will look at the request_id, the
77   // rpc_type, and the peer the RPC was sent to in order to count
78   // this RPC into the right stats bin.
79   void RpcCompleted(
80       const AsyncClientCallResult& call, const std::string& peer,
81       const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata,
82       const std::multimap<grpc::string_ref, grpc::string_ref>&
83           trailing_metadata);
84 
85   LoadBalancerStatsResponse WaitForRpcStatsResponse(int timeout_sec);
86 
87   void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response,
88                           StatsWatchers* stats_watchers);
89 
90  private:
91   int start_id_;
92   int end_id_;
93   int rpcs_needed_;
94   int no_remote_peer_ = 0;
95   std::map<int, int> no_remote_peer_by_type_;
96   // A map of stats keyed by peer name.
97   std::map<std::string, int> rpcs_by_peer_;
98   // A two-level map of stats keyed at top level by RPC method and second level
99   // by peer name.
100   std::map<int, std::map<std::string, int>> rpcs_by_type_;
101   // Storing accumulated stats in the response proto format.
102   LoadBalancerAccumulatedStatsResponse accumulated_stats_;
103   std::mutex m_;
104   std::condition_variable cv_;
105   std::unordered_set<std::string> metadata_keys_;
106   bool include_all_metadata_ = false;
107   std::map<std::string, LoadBalancerStatsResponse::MetadataByPeer>
108       metadata_by_peer_;
109 };
110 
111 }  // namespace testing
112 }  // namespace grpc
113 
114 #endif  // GRPC_TEST_CPP_INTEROP_XDS_STATS_WATCHER_H
115