1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <gmock/gmock.h>
18 #include <gtest/gtest.h>
19
20 #include "absl/strings/str_cat.h"
21 #include "absl/types/optional.h"
22
23 #include <grpc/grpc.h>
24 #include <grpcpp/channel.h>
25 #include <grpcpp/client_context.h>
26 #include <grpcpp/create_channel.h>
27 #include <grpcpp/ext/call_metric_recorder.h>
28 #include <grpcpp/ext/orca_service.h>
29 #include <grpcpp/ext/server_metric_recorder.h>
30 #include <grpcpp/server.h>
31 #include <grpcpp/server_builder.h>
32 #include <grpcpp/server_context.h>
33
34 #include "src/core/lib/gprpp/time.h"
35 #include "src/proto/grpc/testing/xds/v3/orca_service.grpc.pb.h"
36 #include "src/proto/grpc/testing/xds/v3/orca_service.pb.h"
37 #include "test/core/util/port.h"
38 #include "test/core/util/test_config.h"
39
40 using xds::data::orca::v3::OrcaLoadReport;
41 using xds::service::orca::v3::OpenRcaService;
42 using xds::service::orca::v3::OrcaLoadReportRequest;
43
44 namespace grpc {
45 namespace testing {
46 namespace {
47
48 using experimental::OrcaService;
49 using experimental::ServerMetricRecorder;
50
51 class OrcaServiceEnd2endTest : public ::testing::Test {
52 protected:
53 // A wrapper for the client stream that ensures that responses come
54 // back at the requested interval.
55 class Stream {
56 public:
Stream(OpenRcaService::Stub * stub,grpc_core::Duration requested_interval)57 Stream(OpenRcaService::Stub* stub, grpc_core::Duration requested_interval)
58 : requested_interval_(requested_interval) {
59 OrcaLoadReportRequest request;
60 gpr_timespec timespec = requested_interval.as_timespec();
61 auto* interval_proto = request.mutable_report_interval();
62 interval_proto->set_seconds(timespec.tv_sec);
63 interval_proto->set_nanos(timespec.tv_nsec);
64 stream_ = stub->StreamCoreMetrics(&context_, request);
65 }
66
~Stream()67 ~Stream() { context_.TryCancel(); }
68
ReadResponse()69 OrcaLoadReport ReadResponse() {
70 OrcaLoadReport response;
71 EXPECT_TRUE(stream_->Read(&response));
72 auto now = grpc_core::Timestamp::FromTimespecRoundDown(
73 gpr_now(GPR_CLOCK_MONOTONIC));
74 if (last_response_time_.has_value()) {
75 // Allow a small fudge factor to avoid test flakiness.
76 const grpc_core::Duration fudge_factor =
77 grpc_core::Duration::Milliseconds(750) *
78 grpc_test_slowdown_factor();
79 auto elapsed = now - *last_response_time_;
80 EXPECT_GE(elapsed, requested_interval_ - fudge_factor)
81 << elapsed.ToString();
82 EXPECT_LE(elapsed, requested_interval_ + fudge_factor)
83 << elapsed.ToString();
84 }
85 last_response_time_ = now;
86 return response;
87 }
88
89 private:
90 const grpc_core::Duration requested_interval_;
91 ClientContext context_;
92 std::unique_ptr<grpc::ClientReaderInterface<OrcaLoadReport>> stream_;
93 absl::optional<grpc_core::Timestamp> last_response_time_;
94 };
95
OrcaServiceEnd2endTest()96 OrcaServiceEnd2endTest()
97 : server_metric_recorder_(ServerMetricRecorder::Create()),
98 orca_service_(server_metric_recorder_.get(),
99 OrcaService::Options().set_min_report_duration(
100 absl::ZeroDuration())) {
101 std::string server_address =
102 absl::StrCat("localhost:", grpc_pick_unused_port_or_die());
103 ServerBuilder builder;
104 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
105 builder.RegisterService(&orca_service_);
106 server_ = builder.BuildAndStart();
107 gpr_log(GPR_INFO, "server started on %s", server_address_.c_str());
108 auto channel = CreateChannel(server_address, InsecureChannelCredentials());
109 stub_ = OpenRcaService::NewStub(channel);
110 }
111
~OrcaServiceEnd2endTest()112 ~OrcaServiceEnd2endTest() override { server_->Shutdown(); }
113
114 std::string server_address_;
115 std::unique_ptr<ServerMetricRecorder> server_metric_recorder_;
116 OrcaService orca_service_;
117 std::unique_ptr<Server> server_;
118 std::unique_ptr<OpenRcaService::Stub> stub_;
119 };
120
TEST_F(OrcaServiceEnd2endTest,Basic)121 TEST_F(OrcaServiceEnd2endTest, Basic) {
122 constexpr char kMetricName1[] = "foo";
123 constexpr char kMetricName2[] = "bar";
124 constexpr char kMetricName3[] = "baz";
125 constexpr char kMetricName4[] = "quux";
126 // Start stream1 with 5s interval and stream2 with 2.5s interval.
127 // Throughout the test, we should get two responses on stream2 for
128 // every one response on stream1.
129 Stream stream1(stub_.get(), grpc_core::Duration::Milliseconds(5000));
130 Stream stream2(stub_.get(), grpc_core::Duration::Milliseconds(2500));
131 auto ReadResponses = [&](std::function<void(const OrcaLoadReport&)> checker) {
132 gpr_log(GPR_INFO, "reading response from stream1");
133 OrcaLoadReport response = stream1.ReadResponse();
134 checker(response);
135 gpr_log(GPR_INFO, "reading response from stream2");
136 response = stream2.ReadResponse();
137 checker(response);
138 gpr_log(GPR_INFO, "reading response from stream2");
139 response = stream2.ReadResponse();
140 checker(response);
141 };
142 // Initial response should not have any values populated.
143 ReadResponses([](const OrcaLoadReport& response) {
144 EXPECT_EQ(response.application_utilization(), 0);
145 EXPECT_EQ(response.cpu_utilization(), 0);
146 EXPECT_EQ(response.mem_utilization(), 0);
147 EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre());
148 });
149 // Now set app utilization on the server.
150 server_metric_recorder_->SetApplicationUtilization(0.5);
151 ReadResponses([](const OrcaLoadReport& response) {
152 EXPECT_EQ(response.application_utilization(), 0.5);
153 EXPECT_EQ(response.cpu_utilization(), 0);
154 EXPECT_EQ(response.mem_utilization(), 0);
155 EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre());
156 });
157 // Update app utilization and set CPU and memory utilization.
158 server_metric_recorder_->SetApplicationUtilization(1.8);
159 server_metric_recorder_->SetCpuUtilization(0.3);
160 server_metric_recorder_->SetMemoryUtilization(0.4);
161 ReadResponses([](const OrcaLoadReport& response) {
162 EXPECT_EQ(response.application_utilization(), 1.8);
163 EXPECT_EQ(response.cpu_utilization(), 0.3);
164 EXPECT_EQ(response.mem_utilization(), 0.4);
165 EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre());
166 });
167 // Unset app, CPU, and memory utilization and set a named utilization.
168 server_metric_recorder_->ClearApplicationUtilization();
169 server_metric_recorder_->ClearCpuUtilization();
170 server_metric_recorder_->ClearMemoryUtilization();
171 server_metric_recorder_->SetNamedUtilization(kMetricName1, 0.3);
172 ReadResponses([&](const OrcaLoadReport& response) {
173 EXPECT_EQ(response.application_utilization(), 0);
174 EXPECT_EQ(response.cpu_utilization(), 0);
175 EXPECT_EQ(response.mem_utilization(), 0);
176 EXPECT_THAT(
177 response.utilization(),
178 ::testing::UnorderedElementsAre(::testing::Pair(kMetricName1, 0.3)));
179 });
180 // Unset the previous named utilization and set two new ones.
181 server_metric_recorder_->ClearNamedUtilization(kMetricName1);
182 server_metric_recorder_->SetNamedUtilization(kMetricName2, 0.2);
183 server_metric_recorder_->SetNamedUtilization(kMetricName3, 0.1);
184 ReadResponses([&](const OrcaLoadReport& response) {
185 EXPECT_EQ(response.application_utilization(), 0);
186 EXPECT_EQ(response.cpu_utilization(), 0);
187 EXPECT_EQ(response.mem_utilization(), 0);
188 EXPECT_THAT(
189 response.utilization(),
190 ::testing::UnorderedElementsAre(::testing::Pair(kMetricName2, 0.2),
191 ::testing::Pair(kMetricName3, 0.1)));
192 });
193 // Replace the entire named metric map at once.
194 server_metric_recorder_->SetAllNamedUtilization(
195 {{kMetricName2, 0.5}, {kMetricName4, 0.9}});
196 ReadResponses([&](const OrcaLoadReport& response) {
197 EXPECT_EQ(response.application_utilization(), 0);
198 EXPECT_EQ(response.cpu_utilization(), 0);
199 EXPECT_EQ(response.mem_utilization(), 0);
200 EXPECT_THAT(
201 response.utilization(),
202 ::testing::UnorderedElementsAre(::testing::Pair(kMetricName2, 0.5),
203 ::testing::Pair(kMetricName4, 0.9)));
204 });
205 }
206
207 } // namespace
208 } // namespace testing
209 } // namespace grpc
210
main(int argc,char ** argv)211 int main(int argc, char** argv) {
212 grpc::testing::TestEnvironment env(&argc, argv);
213 ::testing::InitGoogleTest(&argc, argv);
214 return RUN_ALL_TESTS();
215 }
216