xref: /aosp_15_r20/external/grpc-grpc/test/cpp/interop/interop_server.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker //
3*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2015-2016 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker //
5*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker //
9*cc02d7e2SAndroid Build Coastguard Worker //     http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker //
11*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker //
18*cc02d7e2SAndroid Build Coastguard Worker 
19*cc02d7e2SAndroid Build Coastguard Worker #include <fstream>
20*cc02d7e2SAndroid Build Coastguard Worker #include <memory>
21*cc02d7e2SAndroid Build Coastguard Worker #include <sstream>
22*cc02d7e2SAndroid Build Coastguard Worker #include <thread>
23*cc02d7e2SAndroid Build Coastguard Worker 
24*cc02d7e2SAndroid Build Coastguard Worker #include "absl/flags/flag.h"
25*cc02d7e2SAndroid Build Coastguard Worker 
26*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/grpc.h>
27*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/log.h>
28*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/time.h>
29*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/ext/call_metric_recorder.h>
30*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/ext/orca_service.h>
31*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/ext/server_metric_recorder.h>
32*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/security/server_credentials.h>
33*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server.h>
34*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server_builder.h>
35*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server_context.h>
36*cc02d7e2SAndroid Build Coastguard Worker 
37*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gpr/string.h"
38*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/crash.h"
39*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/sync.h"
40*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/empty.pb.h"
41*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/messages.pb.h"
42*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/test.grpc.pb.h"
43*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/interop/server_helper.h"
44*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/util/test_config.h"
45*cc02d7e2SAndroid Build Coastguard Worker 
46*cc02d7e2SAndroid Build Coastguard Worker ABSL_FLAG(bool, use_alts, false,
47*cc02d7e2SAndroid Build Coastguard Worker           "Whether to use alts. Enable alts will disable tls.");
48*cc02d7e2SAndroid Build Coastguard Worker ABSL_FLAG(bool, use_tls, false, "Whether to use tls.");
49*cc02d7e2SAndroid Build Coastguard Worker ABSL_FLAG(std::string, custom_credentials_type, "",
50*cc02d7e2SAndroid Build Coastguard Worker           "User provided credentials type.");
51*cc02d7e2SAndroid Build Coastguard Worker ABSL_FLAG(int32_t, port, 0, "Server port.");
52*cc02d7e2SAndroid Build Coastguard Worker ABSL_FLAG(int32_t, max_send_message_size, -1, "The maximum send message size.");
53*cc02d7e2SAndroid Build Coastguard Worker 
54*cc02d7e2SAndroid Build Coastguard Worker using grpc::Server;
55*cc02d7e2SAndroid Build Coastguard Worker using grpc::ServerContext;
56*cc02d7e2SAndroid Build Coastguard Worker using grpc::ServerCredentials;
57*cc02d7e2SAndroid Build Coastguard Worker using grpc::ServerReader;
58*cc02d7e2SAndroid Build Coastguard Worker using grpc::ServerReaderWriter;
59*cc02d7e2SAndroid Build Coastguard Worker using grpc::ServerWriter;
60*cc02d7e2SAndroid Build Coastguard Worker using grpc::Status;
61*cc02d7e2SAndroid Build Coastguard Worker using grpc::WriteOptions;
62*cc02d7e2SAndroid Build Coastguard Worker using grpc::testing::InteropServerContextInspector;
63*cc02d7e2SAndroid Build Coastguard Worker using grpc::testing::Payload;
64*cc02d7e2SAndroid Build Coastguard Worker using grpc::testing::SimpleRequest;
65*cc02d7e2SAndroid Build Coastguard Worker using grpc::testing::SimpleResponse;
66*cc02d7e2SAndroid Build Coastguard Worker using grpc::testing::StreamingInputCallRequest;
67*cc02d7e2SAndroid Build Coastguard Worker using grpc::testing::StreamingInputCallResponse;
68*cc02d7e2SAndroid Build Coastguard Worker using grpc::testing::StreamingOutputCallRequest;
69*cc02d7e2SAndroid Build Coastguard Worker using grpc::testing::StreamingOutputCallResponse;
70*cc02d7e2SAndroid Build Coastguard Worker using grpc::testing::TestService;
71*cc02d7e2SAndroid Build Coastguard Worker 
72*cc02d7e2SAndroid Build Coastguard Worker const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial";
73*cc02d7e2SAndroid Build Coastguard Worker const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin";
74*cc02d7e2SAndroid Build Coastguard Worker const char kEchoUserAgentKey[] = "x-grpc-test-echo-useragent";
75*cc02d7e2SAndroid Build Coastguard Worker 
MaybeEchoMetadata(ServerContext * context)76*cc02d7e2SAndroid Build Coastguard Worker void MaybeEchoMetadata(ServerContext* context) {
77*cc02d7e2SAndroid Build Coastguard Worker   const auto& client_metadata = context->client_metadata();
78*cc02d7e2SAndroid Build Coastguard Worker   GPR_ASSERT(client_metadata.count(kEchoInitialMetadataKey) <= 1);
79*cc02d7e2SAndroid Build Coastguard Worker   GPR_ASSERT(client_metadata.count(kEchoTrailingBinMetadataKey) <= 1);
80*cc02d7e2SAndroid Build Coastguard Worker 
81*cc02d7e2SAndroid Build Coastguard Worker   auto iter = client_metadata.find(kEchoInitialMetadataKey);
82*cc02d7e2SAndroid Build Coastguard Worker   if (iter != client_metadata.end()) {
83*cc02d7e2SAndroid Build Coastguard Worker     context->AddInitialMetadata(
84*cc02d7e2SAndroid Build Coastguard Worker         kEchoInitialMetadataKey,
85*cc02d7e2SAndroid Build Coastguard Worker         std::string(iter->second.begin(), iter->second.end()));
86*cc02d7e2SAndroid Build Coastguard Worker   }
87*cc02d7e2SAndroid Build Coastguard Worker   iter = client_metadata.find(kEchoTrailingBinMetadataKey);
88*cc02d7e2SAndroid Build Coastguard Worker   if (iter != client_metadata.end()) {
89*cc02d7e2SAndroid Build Coastguard Worker     context->AddTrailingMetadata(
90*cc02d7e2SAndroid Build Coastguard Worker         kEchoTrailingBinMetadataKey,
91*cc02d7e2SAndroid Build Coastguard Worker         std::string(iter->second.begin(), iter->second.end()));
92*cc02d7e2SAndroid Build Coastguard Worker   }
93*cc02d7e2SAndroid Build Coastguard Worker   // Check if client sent a magic key in the header that makes us echo
94*cc02d7e2SAndroid Build Coastguard Worker   // back the user-agent (for testing purpose)
95*cc02d7e2SAndroid Build Coastguard Worker   iter = client_metadata.find(kEchoUserAgentKey);
96*cc02d7e2SAndroid Build Coastguard Worker   if (iter != client_metadata.end()) {
97*cc02d7e2SAndroid Build Coastguard Worker     iter = client_metadata.find("user-agent");
98*cc02d7e2SAndroid Build Coastguard Worker     if (iter != client_metadata.end()) {
99*cc02d7e2SAndroid Build Coastguard Worker       context->AddInitialMetadata(
100*cc02d7e2SAndroid Build Coastguard Worker           kEchoUserAgentKey,
101*cc02d7e2SAndroid Build Coastguard Worker           std::string(iter->second.begin(), iter->second.end()));
102*cc02d7e2SAndroid Build Coastguard Worker     }
103*cc02d7e2SAndroid Build Coastguard Worker   }
104*cc02d7e2SAndroid Build Coastguard Worker }
105*cc02d7e2SAndroid Build Coastguard Worker 
SetPayload(int size,Payload * payload)106*cc02d7e2SAndroid Build Coastguard Worker bool SetPayload(int size, Payload* payload) {
107*cc02d7e2SAndroid Build Coastguard Worker   std::unique_ptr<char[]> body(new char[size]());
108*cc02d7e2SAndroid Build Coastguard Worker   payload->set_body(body.get(), size);
109*cc02d7e2SAndroid Build Coastguard Worker   return true;
110*cc02d7e2SAndroid Build Coastguard Worker }
111*cc02d7e2SAndroid Build Coastguard Worker 
CheckExpectedCompression(const ServerContext & context,const bool compression_expected)112*cc02d7e2SAndroid Build Coastguard Worker bool CheckExpectedCompression(const ServerContext& context,
113*cc02d7e2SAndroid Build Coastguard Worker                               const bool compression_expected) {
114*cc02d7e2SAndroid Build Coastguard Worker   const InteropServerContextInspector inspector(context);
115*cc02d7e2SAndroid Build Coastguard Worker   const grpc_compression_algorithm received_compression =
116*cc02d7e2SAndroid Build Coastguard Worker       inspector.GetCallCompressionAlgorithm();
117*cc02d7e2SAndroid Build Coastguard Worker 
118*cc02d7e2SAndroid Build Coastguard Worker   if (compression_expected) {
119*cc02d7e2SAndroid Build Coastguard Worker     if (received_compression == GRPC_COMPRESS_NONE) {
120*cc02d7e2SAndroid Build Coastguard Worker       // Expected some compression, got NONE. This is an error.
121*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_ERROR,
122*cc02d7e2SAndroid Build Coastguard Worker               "Expected compression but got uncompressed request from client.");
123*cc02d7e2SAndroid Build Coastguard Worker       return false;
124*cc02d7e2SAndroid Build Coastguard Worker     }
125*cc02d7e2SAndroid Build Coastguard Worker     if (!(inspector.WasCompressed())) {
126*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_ERROR,
127*cc02d7e2SAndroid Build Coastguard Worker               "Failure: Requested compression in a compressable request, but "
128*cc02d7e2SAndroid Build Coastguard Worker               "compression bit in message flags not set.");
129*cc02d7e2SAndroid Build Coastguard Worker       return false;
130*cc02d7e2SAndroid Build Coastguard Worker     }
131*cc02d7e2SAndroid Build Coastguard Worker   } else {
132*cc02d7e2SAndroid Build Coastguard Worker     // Didn't expect compression -> make sure the request is uncompressed
133*cc02d7e2SAndroid Build Coastguard Worker     if (inspector.WasCompressed()) {
134*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_ERROR,
135*cc02d7e2SAndroid Build Coastguard Worker               "Failure: Didn't requested compression, but compression bit in "
136*cc02d7e2SAndroid Build Coastguard Worker               "message flags set.");
137*cc02d7e2SAndroid Build Coastguard Worker       return false;
138*cc02d7e2SAndroid Build Coastguard Worker     }
139*cc02d7e2SAndroid Build Coastguard Worker   }
140*cc02d7e2SAndroid Build Coastguard Worker   return true;
141*cc02d7e2SAndroid Build Coastguard Worker }
142*cc02d7e2SAndroid Build Coastguard Worker 
RecordCallMetrics(ServerContext * context,const grpc::testing::TestOrcaReport & request_metrics)143*cc02d7e2SAndroid Build Coastguard Worker void RecordCallMetrics(ServerContext* context,
144*cc02d7e2SAndroid Build Coastguard Worker                        const grpc::testing::TestOrcaReport& request_metrics) {
145*cc02d7e2SAndroid Build Coastguard Worker   auto recorder = context->ExperimentalGetCallMetricRecorder();
146*cc02d7e2SAndroid Build Coastguard Worker   // Do not record when zero since it indicates no test per-call report.
147*cc02d7e2SAndroid Build Coastguard Worker   if (request_metrics.cpu_utilization() > 0) {
148*cc02d7e2SAndroid Build Coastguard Worker     recorder->RecordCpuUtilizationMetric(request_metrics.cpu_utilization());
149*cc02d7e2SAndroid Build Coastguard Worker   }
150*cc02d7e2SAndroid Build Coastguard Worker   if (request_metrics.memory_utilization() > 0) {
151*cc02d7e2SAndroid Build Coastguard Worker     recorder->RecordMemoryUtilizationMetric(
152*cc02d7e2SAndroid Build Coastguard Worker         request_metrics.memory_utilization());
153*cc02d7e2SAndroid Build Coastguard Worker   }
154*cc02d7e2SAndroid Build Coastguard Worker   for (const auto& p : request_metrics.request_cost()) {
155*cc02d7e2SAndroid Build Coastguard Worker     char* key = static_cast<char*>(
156*cc02d7e2SAndroid Build Coastguard Worker         grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
157*cc02d7e2SAndroid Build Coastguard Worker     strncpy(key, p.first.data(), p.first.size());
158*cc02d7e2SAndroid Build Coastguard Worker     key[p.first.size()] = '\0';
159*cc02d7e2SAndroid Build Coastguard Worker     recorder->RecordRequestCostMetric(key, p.second);
160*cc02d7e2SAndroid Build Coastguard Worker   }
161*cc02d7e2SAndroid Build Coastguard Worker   for (const auto& p : request_metrics.utilization()) {
162*cc02d7e2SAndroid Build Coastguard Worker     char* key = static_cast<char*>(
163*cc02d7e2SAndroid Build Coastguard Worker         grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
164*cc02d7e2SAndroid Build Coastguard Worker     strncpy(key, p.first.data(), p.first.size());
165*cc02d7e2SAndroid Build Coastguard Worker     key[p.first.size()] = '\0';
166*cc02d7e2SAndroid Build Coastguard Worker     recorder->RecordUtilizationMetric(key, p.second);
167*cc02d7e2SAndroid Build Coastguard Worker   }
168*cc02d7e2SAndroid Build Coastguard Worker }
169*cc02d7e2SAndroid Build Coastguard Worker 
170*cc02d7e2SAndroid Build Coastguard Worker class TestServiceImpl : public TestService::Service {
171*cc02d7e2SAndroid Build Coastguard Worker  public:
TestServiceImpl(grpc::experimental::ServerMetricRecorder * server_metric_recorder)172*cc02d7e2SAndroid Build Coastguard Worker   explicit TestServiceImpl(
173*cc02d7e2SAndroid Build Coastguard Worker       grpc::experimental::ServerMetricRecorder* server_metric_recorder)
174*cc02d7e2SAndroid Build Coastguard Worker       : server_metric_recorder_(server_metric_recorder) {}
175*cc02d7e2SAndroid Build Coastguard Worker 
EmptyCall(ServerContext * context,const grpc::testing::Empty *,grpc::testing::Empty *)176*cc02d7e2SAndroid Build Coastguard Worker   Status EmptyCall(ServerContext* context,
177*cc02d7e2SAndroid Build Coastguard Worker                    const grpc::testing::Empty* /*request*/,
178*cc02d7e2SAndroid Build Coastguard Worker                    grpc::testing::Empty* /*response*/) override {
179*cc02d7e2SAndroid Build Coastguard Worker     MaybeEchoMetadata(context);
180*cc02d7e2SAndroid Build Coastguard Worker     return Status::OK;
181*cc02d7e2SAndroid Build Coastguard Worker   }
182*cc02d7e2SAndroid Build Coastguard Worker 
183*cc02d7e2SAndroid Build Coastguard Worker   // Response contains current timestamp. We ignore everything in the request.
CacheableUnaryCall(ServerContext * context,const SimpleRequest *,SimpleResponse * response)184*cc02d7e2SAndroid Build Coastguard Worker   Status CacheableUnaryCall(ServerContext* context,
185*cc02d7e2SAndroid Build Coastguard Worker                             const SimpleRequest* /*request*/,
186*cc02d7e2SAndroid Build Coastguard Worker                             SimpleResponse* response) override {
187*cc02d7e2SAndroid Build Coastguard Worker     gpr_timespec ts = gpr_now(GPR_CLOCK_PRECISE);
188*cc02d7e2SAndroid Build Coastguard Worker     std::string timestamp = std::to_string(ts.tv_nsec);
189*cc02d7e2SAndroid Build Coastguard Worker     response->mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
190*cc02d7e2SAndroid Build Coastguard Worker     context->AddInitialMetadata("cache-control", "max-age=60, public");
191*cc02d7e2SAndroid Build Coastguard Worker     return Status::OK;
192*cc02d7e2SAndroid Build Coastguard Worker   }
193*cc02d7e2SAndroid Build Coastguard Worker 
UnaryCall(ServerContext * context,const SimpleRequest * request,SimpleResponse * response)194*cc02d7e2SAndroid Build Coastguard Worker   Status UnaryCall(ServerContext* context, const SimpleRequest* request,
195*cc02d7e2SAndroid Build Coastguard Worker                    SimpleResponse* response) override {
196*cc02d7e2SAndroid Build Coastguard Worker     MaybeEchoMetadata(context);
197*cc02d7e2SAndroid Build Coastguard Worker     if (request->has_response_compressed()) {
198*cc02d7e2SAndroid Build Coastguard Worker       const bool compression_requested = request->response_compressed().value();
199*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s",
200*cc02d7e2SAndroid Build Coastguard Worker               compression_requested ? "enabled" : "disabled", __func__);
201*cc02d7e2SAndroid Build Coastguard Worker       if (compression_requested) {
202*cc02d7e2SAndroid Build Coastguard Worker         // Any level would do, let's go for HIGH because we are overachievers.
203*cc02d7e2SAndroid Build Coastguard Worker         context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
204*cc02d7e2SAndroid Build Coastguard Worker       } else {
205*cc02d7e2SAndroid Build Coastguard Worker         context->set_compression_level(GRPC_COMPRESS_LEVEL_NONE);
206*cc02d7e2SAndroid Build Coastguard Worker       }
207*cc02d7e2SAndroid Build Coastguard Worker     }
208*cc02d7e2SAndroid Build Coastguard Worker     if (!CheckExpectedCompression(*context,
209*cc02d7e2SAndroid Build Coastguard Worker                                   request->expect_compressed().value())) {
210*cc02d7e2SAndroid Build Coastguard Worker       return Status(grpc::StatusCode::INVALID_ARGUMENT,
211*cc02d7e2SAndroid Build Coastguard Worker                     "Compressed request expectation not met.");
212*cc02d7e2SAndroid Build Coastguard Worker     }
213*cc02d7e2SAndroid Build Coastguard Worker     if (request->response_size() > 0) {
214*cc02d7e2SAndroid Build Coastguard Worker       if (!SetPayload(request->response_size(), response->mutable_payload())) {
215*cc02d7e2SAndroid Build Coastguard Worker         return Status(grpc::StatusCode::INVALID_ARGUMENT,
216*cc02d7e2SAndroid Build Coastguard Worker                       "Error creating payload.");
217*cc02d7e2SAndroid Build Coastguard Worker       }
218*cc02d7e2SAndroid Build Coastguard Worker     }
219*cc02d7e2SAndroid Build Coastguard Worker 
220*cc02d7e2SAndroid Build Coastguard Worker     if (request->has_response_status()) {
221*cc02d7e2SAndroid Build Coastguard Worker       return Status(
222*cc02d7e2SAndroid Build Coastguard Worker           static_cast<grpc::StatusCode>(request->response_status().code()),
223*cc02d7e2SAndroid Build Coastguard Worker           request->response_status().message());
224*cc02d7e2SAndroid Build Coastguard Worker     }
225*cc02d7e2SAndroid Build Coastguard Worker     if (request->has_orca_per_query_report()) {
226*cc02d7e2SAndroid Build Coastguard Worker       RecordCallMetrics(context, request->orca_per_query_report());
227*cc02d7e2SAndroid Build Coastguard Worker     }
228*cc02d7e2SAndroid Build Coastguard Worker     return Status::OK;
229*cc02d7e2SAndroid Build Coastguard Worker   }
230*cc02d7e2SAndroid Build Coastguard Worker 
StreamingOutputCall(ServerContext * context,const StreamingOutputCallRequest * request,ServerWriter<StreamingOutputCallResponse> * writer)231*cc02d7e2SAndroid Build Coastguard Worker   Status StreamingOutputCall(
232*cc02d7e2SAndroid Build Coastguard Worker       ServerContext* context, const StreamingOutputCallRequest* request,
233*cc02d7e2SAndroid Build Coastguard Worker       ServerWriter<StreamingOutputCallResponse>* writer) override {
234*cc02d7e2SAndroid Build Coastguard Worker     StreamingOutputCallResponse response;
235*cc02d7e2SAndroid Build Coastguard Worker     bool write_success = true;
236*cc02d7e2SAndroid Build Coastguard Worker     for (int i = 0; write_success && i < request->response_parameters_size();
237*cc02d7e2SAndroid Build Coastguard Worker          i++) {
238*cc02d7e2SAndroid Build Coastguard Worker       if (!SetPayload(request->response_parameters(i).size(),
239*cc02d7e2SAndroid Build Coastguard Worker                       response.mutable_payload())) {
240*cc02d7e2SAndroid Build Coastguard Worker         return Status(grpc::StatusCode::INVALID_ARGUMENT,
241*cc02d7e2SAndroid Build Coastguard Worker                       "Error creating payload.");
242*cc02d7e2SAndroid Build Coastguard Worker       }
243*cc02d7e2SAndroid Build Coastguard Worker       WriteOptions wopts;
244*cc02d7e2SAndroid Build Coastguard Worker       if (request->response_parameters(i).has_compressed()) {
245*cc02d7e2SAndroid Build Coastguard Worker         // Compress by default. Disabled on a per-message basis.
246*cc02d7e2SAndroid Build Coastguard Worker         context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
247*cc02d7e2SAndroid Build Coastguard Worker         const bool compression_requested =
248*cc02d7e2SAndroid Build Coastguard Worker             request->response_parameters(i).compressed().value();
249*cc02d7e2SAndroid Build Coastguard Worker         gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s",
250*cc02d7e2SAndroid Build Coastguard Worker                 compression_requested ? "enabled" : "disabled", __func__);
251*cc02d7e2SAndroid Build Coastguard Worker         if (!compression_requested) {
252*cc02d7e2SAndroid Build Coastguard Worker           wopts.set_no_compression();
253*cc02d7e2SAndroid Build Coastguard Worker         }  // else, compression is already enabled via the context.
254*cc02d7e2SAndroid Build Coastguard Worker       }
255*cc02d7e2SAndroid Build Coastguard Worker       int time_us;
256*cc02d7e2SAndroid Build Coastguard Worker       if ((time_us = request->response_parameters(i).interval_us()) > 0) {
257*cc02d7e2SAndroid Build Coastguard Worker         // Sleep before response if needed
258*cc02d7e2SAndroid Build Coastguard Worker         gpr_timespec sleep_time =
259*cc02d7e2SAndroid Build Coastguard Worker             gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
260*cc02d7e2SAndroid Build Coastguard Worker                          gpr_time_from_micros(time_us, GPR_TIMESPAN));
261*cc02d7e2SAndroid Build Coastguard Worker         gpr_sleep_until(sleep_time);
262*cc02d7e2SAndroid Build Coastguard Worker       }
263*cc02d7e2SAndroid Build Coastguard Worker       write_success = writer->Write(response, wopts);
264*cc02d7e2SAndroid Build Coastguard Worker     }
265*cc02d7e2SAndroid Build Coastguard Worker     if (write_success) {
266*cc02d7e2SAndroid Build Coastguard Worker       return Status::OK;
267*cc02d7e2SAndroid Build Coastguard Worker     } else {
268*cc02d7e2SAndroid Build Coastguard Worker       return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
269*cc02d7e2SAndroid Build Coastguard Worker     }
270*cc02d7e2SAndroid Build Coastguard Worker   }
271*cc02d7e2SAndroid Build Coastguard Worker 
StreamingInputCall(ServerContext * context,ServerReader<StreamingInputCallRequest> * reader,StreamingInputCallResponse * response)272*cc02d7e2SAndroid Build Coastguard Worker   Status StreamingInputCall(ServerContext* context,
273*cc02d7e2SAndroid Build Coastguard Worker                             ServerReader<StreamingInputCallRequest>* reader,
274*cc02d7e2SAndroid Build Coastguard Worker                             StreamingInputCallResponse* response) override {
275*cc02d7e2SAndroid Build Coastguard Worker     StreamingInputCallRequest request;
276*cc02d7e2SAndroid Build Coastguard Worker     int aggregated_payload_size = 0;
277*cc02d7e2SAndroid Build Coastguard Worker     while (reader->Read(&request)) {
278*cc02d7e2SAndroid Build Coastguard Worker       if (!CheckExpectedCompression(*context,
279*cc02d7e2SAndroid Build Coastguard Worker                                     request.expect_compressed().value())) {
280*cc02d7e2SAndroid Build Coastguard Worker         return Status(grpc::StatusCode::INVALID_ARGUMENT,
281*cc02d7e2SAndroid Build Coastguard Worker                       "Compressed request expectation not met.");
282*cc02d7e2SAndroid Build Coastguard Worker       }
283*cc02d7e2SAndroid Build Coastguard Worker       if (request.has_payload()) {
284*cc02d7e2SAndroid Build Coastguard Worker         aggregated_payload_size += request.payload().body().size();
285*cc02d7e2SAndroid Build Coastguard Worker       }
286*cc02d7e2SAndroid Build Coastguard Worker     }
287*cc02d7e2SAndroid Build Coastguard Worker     response->set_aggregated_payload_size(aggregated_payload_size);
288*cc02d7e2SAndroid Build Coastguard Worker     return Status::OK;
289*cc02d7e2SAndroid Build Coastguard Worker   }
290*cc02d7e2SAndroid Build Coastguard Worker 
FullDuplexCall(ServerContext * context,ServerReaderWriter<StreamingOutputCallResponse,StreamingOutputCallRequest> * stream)291*cc02d7e2SAndroid Build Coastguard Worker   Status FullDuplexCall(
292*cc02d7e2SAndroid Build Coastguard Worker       ServerContext* context,
293*cc02d7e2SAndroid Build Coastguard Worker       ServerReaderWriter<StreamingOutputCallResponse,
294*cc02d7e2SAndroid Build Coastguard Worker                          StreamingOutputCallRequest>* stream) override {
295*cc02d7e2SAndroid Build Coastguard Worker     MaybeEchoMetadata(context);
296*cc02d7e2SAndroid Build Coastguard Worker     StreamingOutputCallRequest request;
297*cc02d7e2SAndroid Build Coastguard Worker     StreamingOutputCallResponse response;
298*cc02d7e2SAndroid Build Coastguard Worker     bool write_success = true;
299*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<grpc_core::MutexLock> orca_oob_lock;
300*cc02d7e2SAndroid Build Coastguard Worker     while (write_success && stream->Read(&request)) {
301*cc02d7e2SAndroid Build Coastguard Worker       if (request.has_response_status()) {
302*cc02d7e2SAndroid Build Coastguard Worker         return Status(
303*cc02d7e2SAndroid Build Coastguard Worker             static_cast<grpc::StatusCode>(request.response_status().code()),
304*cc02d7e2SAndroid Build Coastguard Worker             request.response_status().message());
305*cc02d7e2SAndroid Build Coastguard Worker       }
306*cc02d7e2SAndroid Build Coastguard Worker       if (request.response_parameters_size() != 0) {
307*cc02d7e2SAndroid Build Coastguard Worker         response.mutable_payload()->set_type(request.payload().type());
308*cc02d7e2SAndroid Build Coastguard Worker         response.mutable_payload()->set_body(
309*cc02d7e2SAndroid Build Coastguard Worker             std::string(request.response_parameters(0).size(), '\0'));
310*cc02d7e2SAndroid Build Coastguard Worker         int time_us;
311*cc02d7e2SAndroid Build Coastguard Worker         if ((time_us = request.response_parameters(0).interval_us()) > 0) {
312*cc02d7e2SAndroid Build Coastguard Worker           // Sleep before response if needed
313*cc02d7e2SAndroid Build Coastguard Worker           gpr_timespec sleep_time =
314*cc02d7e2SAndroid Build Coastguard Worker               gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
315*cc02d7e2SAndroid Build Coastguard Worker                            gpr_time_from_micros(time_us, GPR_TIMESPAN));
316*cc02d7e2SAndroid Build Coastguard Worker           gpr_sleep_until(sleep_time);
317*cc02d7e2SAndroid Build Coastguard Worker         }
318*cc02d7e2SAndroid Build Coastguard Worker         write_success = stream->Write(response);
319*cc02d7e2SAndroid Build Coastguard Worker       }
320*cc02d7e2SAndroid Build Coastguard Worker       if (request.has_orca_oob_report()) {
321*cc02d7e2SAndroid Build Coastguard Worker         if (orca_oob_lock == nullptr) {
322*cc02d7e2SAndroid Build Coastguard Worker           orca_oob_lock =
323*cc02d7e2SAndroid Build Coastguard Worker               std::make_unique<grpc_core::MutexLock>(&orca_oob_server_mu_);
324*cc02d7e2SAndroid Build Coastguard Worker           server_metric_recorder_->ClearCpuUtilization();
325*cc02d7e2SAndroid Build Coastguard Worker           server_metric_recorder_->ClearEps();
326*cc02d7e2SAndroid Build Coastguard Worker           server_metric_recorder_->ClearMemoryUtilization();
327*cc02d7e2SAndroid Build Coastguard Worker           server_metric_recorder_->SetAllNamedUtilization({});
328*cc02d7e2SAndroid Build Coastguard Worker           server_metric_recorder_->ClearQps();
329*cc02d7e2SAndroid Build Coastguard Worker         }
330*cc02d7e2SAndroid Build Coastguard Worker         RecordServerMetrics(request.orca_oob_report());
331*cc02d7e2SAndroid Build Coastguard Worker       }
332*cc02d7e2SAndroid Build Coastguard Worker     }
333*cc02d7e2SAndroid Build Coastguard Worker     if (write_success) {
334*cc02d7e2SAndroid Build Coastguard Worker       return Status::OK;
335*cc02d7e2SAndroid Build Coastguard Worker     } else {
336*cc02d7e2SAndroid Build Coastguard Worker       return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
337*cc02d7e2SAndroid Build Coastguard Worker     }
338*cc02d7e2SAndroid Build Coastguard Worker   }
339*cc02d7e2SAndroid Build Coastguard Worker 
HalfDuplexCall(ServerContext *,ServerReaderWriter<StreamingOutputCallResponse,StreamingOutputCallRequest> * stream)340*cc02d7e2SAndroid Build Coastguard Worker   Status HalfDuplexCall(
341*cc02d7e2SAndroid Build Coastguard Worker       ServerContext* /*context*/,
342*cc02d7e2SAndroid Build Coastguard Worker       ServerReaderWriter<StreamingOutputCallResponse,
343*cc02d7e2SAndroid Build Coastguard Worker                          StreamingOutputCallRequest>* stream) override {
344*cc02d7e2SAndroid Build Coastguard Worker     std::vector<StreamingOutputCallRequest> requests;
345*cc02d7e2SAndroid Build Coastguard Worker     StreamingOutputCallRequest request;
346*cc02d7e2SAndroid Build Coastguard Worker     while (stream->Read(&request)) {
347*cc02d7e2SAndroid Build Coastguard Worker       requests.push_back(request);
348*cc02d7e2SAndroid Build Coastguard Worker     }
349*cc02d7e2SAndroid Build Coastguard Worker 
350*cc02d7e2SAndroid Build Coastguard Worker     StreamingOutputCallResponse response;
351*cc02d7e2SAndroid Build Coastguard Worker     bool write_success = true;
352*cc02d7e2SAndroid Build Coastguard Worker     for (unsigned int i = 0; write_success && i < requests.size(); i++) {
353*cc02d7e2SAndroid Build Coastguard Worker       response.mutable_payload()->set_type(requests[i].payload().type());
354*cc02d7e2SAndroid Build Coastguard Worker       if (requests[i].response_parameters_size() == 0) {
355*cc02d7e2SAndroid Build Coastguard Worker         return Status(grpc::StatusCode::INTERNAL,
356*cc02d7e2SAndroid Build Coastguard Worker                       "Request does not have response parameters.");
357*cc02d7e2SAndroid Build Coastguard Worker       }
358*cc02d7e2SAndroid Build Coastguard Worker       response.mutable_payload()->set_body(
359*cc02d7e2SAndroid Build Coastguard Worker           std::string(requests[i].response_parameters(0).size(), '\0'));
360*cc02d7e2SAndroid Build Coastguard Worker       write_success = stream->Write(response);
361*cc02d7e2SAndroid Build Coastguard Worker     }
362*cc02d7e2SAndroid Build Coastguard Worker     if (write_success) {
363*cc02d7e2SAndroid Build Coastguard Worker       return Status::OK;
364*cc02d7e2SAndroid Build Coastguard Worker     } else {
365*cc02d7e2SAndroid Build Coastguard Worker       return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
366*cc02d7e2SAndroid Build Coastguard Worker     }
367*cc02d7e2SAndroid Build Coastguard Worker   }
368*cc02d7e2SAndroid Build Coastguard Worker 
369*cc02d7e2SAndroid Build Coastguard Worker  private:
RecordServerMetrics(const grpc::testing::TestOrcaReport & request_metrics)370*cc02d7e2SAndroid Build Coastguard Worker   void RecordServerMetrics(
371*cc02d7e2SAndroid Build Coastguard Worker       const grpc::testing::TestOrcaReport& request_metrics) {
372*cc02d7e2SAndroid Build Coastguard Worker     // Do not record when zero since it indicates no test per-call report.
373*cc02d7e2SAndroid Build Coastguard Worker     if (request_metrics.cpu_utilization() > 0) {
374*cc02d7e2SAndroid Build Coastguard Worker       server_metric_recorder_->SetCpuUtilization(
375*cc02d7e2SAndroid Build Coastguard Worker           request_metrics.cpu_utilization());
376*cc02d7e2SAndroid Build Coastguard Worker     }
377*cc02d7e2SAndroid Build Coastguard Worker     if (request_metrics.memory_utilization() > 0) {
378*cc02d7e2SAndroid Build Coastguard Worker       server_metric_recorder_->SetMemoryUtilization(
379*cc02d7e2SAndroid Build Coastguard Worker           request_metrics.memory_utilization());
380*cc02d7e2SAndroid Build Coastguard Worker     }
381*cc02d7e2SAndroid Build Coastguard Worker     grpc_core::MutexLock lock(&retained_utilization_names_mu_);
382*cc02d7e2SAndroid Build Coastguard Worker     std::map<grpc::string_ref, double> named_utilizations;
383*cc02d7e2SAndroid Build Coastguard Worker     for (const auto& p : request_metrics.utilization()) {
384*cc02d7e2SAndroid Build Coastguard Worker       const auto& key = *retained_utilization_names_.insert(p.first).first;
385*cc02d7e2SAndroid Build Coastguard Worker       named_utilizations.emplace(key, p.second);
386*cc02d7e2SAndroid Build Coastguard Worker     }
387*cc02d7e2SAndroid Build Coastguard Worker     server_metric_recorder_->SetAllNamedUtilization(named_utilizations);
388*cc02d7e2SAndroid Build Coastguard Worker   }
389*cc02d7e2SAndroid Build Coastguard Worker 
390*cc02d7e2SAndroid Build Coastguard Worker   grpc::experimental::ServerMetricRecorder* server_metric_recorder_;
391*cc02d7e2SAndroid Build Coastguard Worker   std::set<std::string> retained_utilization_names_
392*cc02d7e2SAndroid Build Coastguard Worker       ABSL_GUARDED_BY(retained_utilization_names_mu_);
393*cc02d7e2SAndroid Build Coastguard Worker   grpc_core::Mutex retained_utilization_names_mu_;
394*cc02d7e2SAndroid Build Coastguard Worker   // Only a single client requesting Orca OOB reports is allowed at a time
395*cc02d7e2SAndroid Build Coastguard Worker   grpc_core::Mutex orca_oob_server_mu_;
396*cc02d7e2SAndroid Build Coastguard Worker };
397*cc02d7e2SAndroid Build Coastguard Worker 
RunServer(const std::shared_ptr<ServerCredentials> & creds)398*cc02d7e2SAndroid Build Coastguard Worker void grpc::testing::interop::RunServer(
399*cc02d7e2SAndroid Build Coastguard Worker     const std::shared_ptr<ServerCredentials>& creds) {
400*cc02d7e2SAndroid Build Coastguard Worker   RunServer(creds, absl::GetFlag(FLAGS_port), nullptr, nullptr);
401*cc02d7e2SAndroid Build Coastguard Worker }
402*cc02d7e2SAndroid Build Coastguard Worker 
RunServer(const std::shared_ptr<ServerCredentials> & creds,std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>> server_options)403*cc02d7e2SAndroid Build Coastguard Worker void grpc::testing::interop::RunServer(
404*cc02d7e2SAndroid Build Coastguard Worker     const std::shared_ptr<ServerCredentials>& creds,
405*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>>
406*cc02d7e2SAndroid Build Coastguard Worker         server_options) {
407*cc02d7e2SAndroid Build Coastguard Worker   RunServer(creds, absl::GetFlag(FLAGS_port), nullptr,
408*cc02d7e2SAndroid Build Coastguard Worker             std::move(server_options));
409*cc02d7e2SAndroid Build Coastguard Worker }
410*cc02d7e2SAndroid Build Coastguard Worker 
RunServer(const std::shared_ptr<ServerCredentials> & creds,const int port,ServerStartedCondition * server_started_condition)411*cc02d7e2SAndroid Build Coastguard Worker void grpc::testing::interop::RunServer(
412*cc02d7e2SAndroid Build Coastguard Worker     const std::shared_ptr<ServerCredentials>& creds, const int port,
413*cc02d7e2SAndroid Build Coastguard Worker     ServerStartedCondition* server_started_condition) {
414*cc02d7e2SAndroid Build Coastguard Worker   RunServer(creds, port, server_started_condition, nullptr);
415*cc02d7e2SAndroid Build Coastguard Worker }
416*cc02d7e2SAndroid Build Coastguard Worker 
RunServer(const std::shared_ptr<ServerCredentials> & creds,const int port,ServerStartedCondition * server_started_condition,std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>> server_options)417*cc02d7e2SAndroid Build Coastguard Worker void grpc::testing::interop::RunServer(
418*cc02d7e2SAndroid Build Coastguard Worker     const std::shared_ptr<ServerCredentials>& creds, const int port,
419*cc02d7e2SAndroid Build Coastguard Worker     ServerStartedCondition* server_started_condition,
420*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>>
421*cc02d7e2SAndroid Build Coastguard Worker         server_options) {
422*cc02d7e2SAndroid Build Coastguard Worker   GPR_ASSERT(port != 0);
423*cc02d7e2SAndroid Build Coastguard Worker   std::ostringstream server_address;
424*cc02d7e2SAndroid Build Coastguard Worker   server_address << "0.0.0.0:" << port;
425*cc02d7e2SAndroid Build Coastguard Worker   auto server_metric_recorder =
426*cc02d7e2SAndroid Build Coastguard Worker       grpc::experimental::ServerMetricRecorder::Create();
427*cc02d7e2SAndroid Build Coastguard Worker   TestServiceImpl service(server_metric_recorder.get());
428*cc02d7e2SAndroid Build Coastguard Worker   grpc::experimental::OrcaService orca_service(
429*cc02d7e2SAndroid Build Coastguard Worker       server_metric_recorder.get(),
430*cc02d7e2SAndroid Build Coastguard Worker       experimental::OrcaService::Options().set_min_report_duration(
431*cc02d7e2SAndroid Build Coastguard Worker           absl::Seconds(0.1)));
432*cc02d7e2SAndroid Build Coastguard Worker   ServerBuilder builder;
433*cc02d7e2SAndroid Build Coastguard Worker   builder.RegisterService(&service);
434*cc02d7e2SAndroid Build Coastguard Worker   builder.RegisterService(&orca_service);
435*cc02d7e2SAndroid Build Coastguard Worker   builder.AddListeningPort(server_address.str(), creds);
436*cc02d7e2SAndroid Build Coastguard Worker   if (server_options != nullptr) {
437*cc02d7e2SAndroid Build Coastguard Worker     for (size_t i = 0; i < server_options->size(); i++) {
438*cc02d7e2SAndroid Build Coastguard Worker       builder.SetOption(std::move((*server_options)[i]));
439*cc02d7e2SAndroid Build Coastguard Worker     }
440*cc02d7e2SAndroid Build Coastguard Worker   }
441*cc02d7e2SAndroid Build Coastguard Worker   if (absl::GetFlag(FLAGS_max_send_message_size) >= 0) {
442*cc02d7e2SAndroid Build Coastguard Worker     builder.SetMaxSendMessageSize(absl::GetFlag(FLAGS_max_send_message_size));
443*cc02d7e2SAndroid Build Coastguard Worker   }
444*cc02d7e2SAndroid Build Coastguard Worker   grpc::ServerBuilder::experimental_type(&builder).EnableCallMetricRecording(
445*cc02d7e2SAndroid Build Coastguard Worker       nullptr);
446*cc02d7e2SAndroid Build Coastguard Worker   std::unique_ptr<Server> server(builder.BuildAndStart());
447*cc02d7e2SAndroid Build Coastguard Worker   gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
448*cc02d7e2SAndroid Build Coastguard Worker 
449*cc02d7e2SAndroid Build Coastguard Worker   // Signal that the server has started.
450*cc02d7e2SAndroid Build Coastguard Worker   if (server_started_condition) {
451*cc02d7e2SAndroid Build Coastguard Worker     std::unique_lock<std::mutex> lock(server_started_condition->mutex);
452*cc02d7e2SAndroid Build Coastguard Worker     server_started_condition->server_started = true;
453*cc02d7e2SAndroid Build Coastguard Worker     server_started_condition->condition.notify_all();
454*cc02d7e2SAndroid Build Coastguard Worker   }
455*cc02d7e2SAndroid Build Coastguard Worker 
456*cc02d7e2SAndroid Build Coastguard Worker   while (!gpr_atm_no_barrier_load(&g_got_sigint)) {
457*cc02d7e2SAndroid Build Coastguard Worker     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
458*cc02d7e2SAndroid Build Coastguard Worker                                  gpr_time_from_seconds(5, GPR_TIMESPAN)));
459*cc02d7e2SAndroid Build Coastguard Worker   }
460*cc02d7e2SAndroid Build Coastguard Worker }
461