1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <stddef.h>
18 #include <stdint.h>
19
20 #include <map>
21 #include <memory>
22 #include <utility>
23
24 #include "absl/base/thread_annotations.h"
25 #include "absl/strings/string_view.h"
26 #include "absl/time/time.h"
27 #include "absl/types/optional.h"
28 #include "google/protobuf/duration.upb.h"
29 #include "upb/base/string_view.h"
30 #include "upb/upb.hpp"
31 #include "xds/data/orca/v3/orca_load_report.upb.h"
32 #include "xds/service/orca/v3/orca.upb.h"
33
34 #include <grpc/event_engine/event_engine.h>
35 #include <grpc/support/log.h>
36 #include <grpcpp/ext/orca_service.h>
37 #include <grpcpp/ext/server_metric_recorder.h>
38 #include <grpcpp/impl/rpc_method.h>
39 #include <grpcpp/impl/rpc_service_method.h>
40 #include <grpcpp/impl/server_callback_handlers.h>
41 #include <grpcpp/impl/sync.h>
42 #include <grpcpp/server_context.h>
43 #include <grpcpp/support/byte_buffer.h>
44 #include <grpcpp/support/server_callback.h>
45 #include <grpcpp/support/slice.h>
46 #include <grpcpp/support/status.h>
47
48 #include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
49 #include "src/core/lib/event_engine/default_event_engine.h"
50 #include "src/core/lib/gprpp/debug_location.h"
51 #include "src/core/lib/gprpp/ref_counted.h"
52 #include "src/core/lib/gprpp/ref_counted_ptr.h"
53 #include "src/core/lib/gprpp/time.h"
54 #include "src/core/lib/iomgr/exec_ctx.h"
55 #include "src/cpp/server/backend_metric_recorder.h"
56
57 namespace grpc {
58 namespace experimental {
59
60 using ::grpc_event_engine::experimental::EventEngine;
61
62 //
63 // OrcaService::Reactor
64 //
65
66 class OrcaService::Reactor : public ServerWriteReactor<ByteBuffer>,
67 public grpc_core::RefCounted<Reactor> {
68 public:
Reactor(OrcaService * service,const ByteBuffer * request_buffer)69 explicit Reactor(OrcaService* service, const ByteBuffer* request_buffer)
70 : RefCounted("OrcaService::Reactor"),
71 service_(service),
72 engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
73 // Get slice from request.
74 Slice slice;
75 GPR_ASSERT(request_buffer->DumpToSingleSlice(&slice).ok());
76 // Parse request proto.
77 upb::Arena arena;
78 xds_service_orca_v3_OrcaLoadReportRequest* request =
79 xds_service_orca_v3_OrcaLoadReportRequest_parse(
80 reinterpret_cast<const char*>(slice.begin()), slice.size(),
81 arena.ptr());
82 if (request == nullptr) {
83 Finish(Status(StatusCode::INTERNAL, "could not parse request proto"));
84 return;
85 }
86 const auto* duration_proto =
87 xds_service_orca_v3_OrcaLoadReportRequest_report_interval(request);
88 if (duration_proto != nullptr) {
89 report_interval_ = grpc_core::Duration::FromSecondsAndNanoseconds(
90 google_protobuf_Duration_seconds(duration_proto),
91 google_protobuf_Duration_nanos(duration_proto));
92 }
93 auto min_interval = grpc_core::Duration::Milliseconds(
94 service_->min_report_duration_ / absl::Milliseconds(1));
95 if (report_interval_ < min_interval) report_interval_ = min_interval;
96 // Send initial response.
97 SendResponse();
98 }
99
OnWriteDone(bool ok)100 void OnWriteDone(bool ok) override {
101 if (!ok) {
102 Finish(Status(StatusCode::UNKNOWN, "write failed"));
103 return;
104 }
105 response_.Clear();
106 if (!MaybeScheduleTimer()) {
107 Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
108 }
109 }
110
OnCancel()111 void OnCancel() override {
112 if (MaybeCancelTimer()) {
113 Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
114 }
115 }
116
OnDone()117 void OnDone() override {
118 // Free the initial ref from instantiation.
119 Unref(DEBUG_LOCATION, "OnDone");
120 }
121
122 private:
SendResponse()123 void SendResponse() {
124 Slice response_slice = service_->GetOrCreateSerializedResponse();
125 ByteBuffer response_buffer(&response_slice, 1);
126 response_.Swap(&response_buffer);
127 StartWrite(&response_);
128 }
129
MaybeScheduleTimer()130 bool MaybeScheduleTimer() {
131 grpc::internal::MutexLock lock(&timer_mu_);
132 if (cancelled_) return false;
133 timer_handle_ = engine_->RunAfter(
134 report_interval_,
135 [self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); });
136 return true;
137 }
138
MaybeCancelTimer()139 bool MaybeCancelTimer() {
140 grpc::internal::MutexLock lock(&timer_mu_);
141 cancelled_ = true;
142 if (timer_handle_.has_value() && engine_->Cancel(*timer_handle_)) {
143 timer_handle_.reset();
144 return true;
145 }
146 return false;
147 }
148
OnTimer()149 void OnTimer() {
150 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
151 grpc_core::ExecCtx exec_ctx;
152 grpc::internal::MutexLock lock(&timer_mu_);
153 timer_handle_.reset();
154 SendResponse();
155 }
156
157 OrcaService* service_;
158
159 grpc::internal::Mutex timer_mu_;
160 absl::optional<EventEngine::TaskHandle> timer_handle_
161 ABSL_GUARDED_BY(&timer_mu_);
162 bool cancelled_ ABSL_GUARDED_BY(&timer_mu_) = false;
163
164 grpc_core::Duration report_interval_;
165 ByteBuffer response_;
166 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
167 };
168
169 //
170 // OrcaService
171 //
172
OrcaService(ServerMetricRecorder * const server_metric_recorder,Options options)173 OrcaService::OrcaService(ServerMetricRecorder* const server_metric_recorder,
174 Options options)
175 : server_metric_recorder_(server_metric_recorder),
176 min_report_duration_(options.min_report_duration) {
177 GPR_ASSERT(server_metric_recorder_ != nullptr);
178 AddMethod(new internal::RpcServiceMethod(
179 "/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics",
180 internal::RpcMethod::SERVER_STREAMING, /*handler=*/nullptr));
181 MarkMethodCallback(
182 0, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>(
183 [this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) {
184 return new Reactor(this, request);
185 }));
186 }
187
GetOrCreateSerializedResponse()188 Slice OrcaService::GetOrCreateSerializedResponse() {
189 grpc::internal::MutexLock lock(&mu_);
190 std::shared_ptr<const ServerMetricRecorder::BackendMetricDataState> result =
191 server_metric_recorder_->GetMetricsIfChanged();
192 if (!response_slice_seq_.has_value() ||
193 *response_slice_seq_ != result->sequence_number) {
194 const auto& data = result->data;
195 upb::Arena arena;
196 xds_data_orca_v3_OrcaLoadReport* response =
197 xds_data_orca_v3_OrcaLoadReport_new(arena.ptr());
198 if (data.cpu_utilization != -1) {
199 xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization(response,
200 data.cpu_utilization);
201 }
202 if (data.mem_utilization != -1) {
203 xds_data_orca_v3_OrcaLoadReport_set_mem_utilization(response,
204 data.mem_utilization);
205 }
206 if (data.application_utilization != -1) {
207 xds_data_orca_v3_OrcaLoadReport_set_application_utilization(
208 response, data.application_utilization);
209 }
210 if (data.qps != -1) {
211 xds_data_orca_v3_OrcaLoadReport_set_rps_fractional(response, data.qps);
212 }
213 if (data.eps != -1) {
214 xds_data_orca_v3_OrcaLoadReport_set_eps(response, data.eps);
215 }
216 for (const auto& u : data.utilization) {
217 xds_data_orca_v3_OrcaLoadReport_utilization_set(
218 response,
219 upb_StringView_FromDataAndSize(u.first.data(), u.first.size()),
220 u.second, arena.ptr());
221 }
222 size_t buf_length;
223 char* buf = xds_data_orca_v3_OrcaLoadReport_serialize(response, arena.ptr(),
224 &buf_length);
225 response_slice_.emplace(buf, buf_length);
226 response_slice_seq_ = result->sequence_number;
227 }
228 return Slice(*response_slice_);
229 }
230
231 } // namespace experimental
232 } // namespace grpc
233