xref: /aosp_15_r20/external/grpc-grpc/test/core/memory_usage/memory_usage_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <stdint.h>
20 #include <stdio.h>
21 #include <string.h>
22 
23 #include <algorithm>
24 #include <iterator>
25 #include <limits>
26 #include <map>
27 #include <memory>
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 #include "absl/algorithm/container.h"
33 #include "absl/flags/flag.h"
34 #include "absl/flags/parse.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/str_join.h"
37 #include "absl/strings/str_split.h"
38 #include "absl/strings/string_view.h"
39 #include "google/protobuf/wrappers.pb.h"
40 
41 #include <grpc/grpc.h>
42 #include <grpc/support/log.h>
43 #include <grpc/support/time.h>
44 #include <grpcpp/security/server_credentials.h>
45 #include <grpcpp/server.h>
46 #include <grpcpp/server_builder.h>
47 
48 #include "src/core/lib/config/config_vars.h"
49 #include "src/core/lib/gpr/subprocess.h"
50 #include "src/core/lib/gprpp/env.h"
51 #include "src/proto/grpc/testing/xds/v3/cluster.pb.h"
52 #include "src/proto/grpc/testing/xds/v3/health_check.pb.h"
53 #include "test/core/util/port.h"
54 #include "test/core/util/resolve_localhost_ip46.h"
55 #include "test/core/util/test_config.h"
56 #include "test/cpp/end2end/xds/xds_server.h"
57 #include "test/cpp/end2end/xds/xds_utils.h"
58 
59 using grpc::testing::XdsResourceUtils;
60 
61 // Default all benchmarks in order to trigger CI testing for each one
62 ABSL_FLAG(std::string, benchmark_names, "",
63           "Which benchmark to run.  If empty, defaults to 'call,channel' "
64           "if --use_xds is false, or 'call,channel,channel_multi_address' "
65           "if --use_xds is true.");
66 
67 ABSL_FLAG(int, size, 1000, "Number of channels/calls");
68 ABSL_FLAG(std::string, scenario_config, "insecure",
69           "Possible Values: minstack (Use minimal stack), resource_quota, "
70           "secure (Use SSL credentials on server)");
71 ABSL_FLAG(bool, memory_profiling, false,
72           "Run memory profiling");  // TODO (chennancy) Connect this flag
73 ABSL_FLAG(bool, use_xds, false, "Use xDS");
74 
75 // TODO(roth, ctiller): Add support for multiple addresses per channel.
76 
77 class Subprocess {
78  public:
Subprocess(std::vector<std::string> args)79   explicit Subprocess(std::vector<std::string> args) {
80     std::vector<const char*> args_c;
81     args_c.reserve(args.size());
82     for (const auto& arg : args) {
83       args_c.push_back(arg.c_str());
84     }
85     process_ = gpr_subprocess_create(args_c.size(), args_c.data());
86   }
87 
GetPID()88   int GetPID() { return gpr_subprocess_get_process_id(process_); }
Join()89   int Join() { return gpr_subprocess_join(process_); }
Interrupt()90   void Interrupt() { gpr_subprocess_interrupt(process_); }
91 
~Subprocess()92   ~Subprocess() { gpr_subprocess_destroy(process_); }
93 
94  private:
95   gpr_subprocess* process_;
96 };
97 
98 // per-call memory usage benchmark
RunCallBenchmark(int port,char * root,std::vector<std::string> server_scenario_flags,std::vector<std::string> client_scenario_flags)99 int RunCallBenchmark(int port, char* root,
100                      std::vector<std::string> server_scenario_flags,
101                      std::vector<std::string> client_scenario_flags) {
102   int status;
103 
104   // start the server
105   gpr_log(GPR_INFO, "starting server");
106   std::vector<std::string> server_flags = {
107       absl::StrCat(root, "/memory_usage_server",
108                    gpr_subprocess_binary_extension()),
109       "--grpc_experiments",
110       std::string(grpc_core::ConfigVars::Get().Experiments()), "--bind",
111       grpc_core::LocalIpAndPort(port)};
112   if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds");
113   // Add scenario-specific server flags to the end of the server_flags
114   absl::c_move(server_scenario_flags, std::back_inserter(server_flags));
115   Subprocess svr(server_flags);
116   gpr_log(GPR_INFO, "server started, pid %d", svr.GetPID());
117 
118   // Wait one second before starting client to give the server a chance
119   // to start up.
120   gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
121 
122   // start the client
123   gpr_log(GPR_INFO, "starting client");
124   std::vector<std::string> client_flags = {
125       absl::StrCat(root, "/memory_usage_client",
126                    gpr_subprocess_binary_extension()),
127       "--target",
128       absl::GetFlag(FLAGS_use_xds)
129           ? absl::StrCat("xds:", XdsResourceUtils::kServerName)
130           : grpc_core::LocalIpAndPort(port),
131       "--grpc_experiments",
132       std::string(grpc_core::ConfigVars::Get().Experiments()),
133       absl::StrCat("--warmup=", 10000),
134       absl::StrCat("--benchmark=", absl::GetFlag(FLAGS_size))};
135   // Add scenario-specific client flags to the end of the client_flags
136   absl::c_move(client_scenario_flags, std::back_inserter(client_flags));
137   Subprocess cli(client_flags);
138   gpr_log(GPR_INFO, "client started, pid %d", cli.GetPID());
139   // wait for completion
140   if ((status = cli.Join()) != 0) {
141     printf("client failed with: %d", status);
142     return 1;
143   }
144 
145   svr.Interrupt();
146   return svr.Join() == 0 ? 0 : 2;
147 }
148 
149 // Per-channel benchmark
RunChannelBenchmark(const std::vector<int> & server_ports,char * root)150 int RunChannelBenchmark(const std::vector<int>& server_ports, char* root) {
151   // TODO(chennancy) Add the scenario specific flags
152 
153   // start the servers
154   std::vector<Subprocess> servers;
155   servers.reserve(server_ports.size());
156   for (int port : server_ports) {
157     gpr_log(GPR_INFO, "starting server on port %d", port);
158     std::vector<std::string> server_flags = {
159         absl::StrCat(root, "/memory_usage_callback_server",
160                      gpr_subprocess_binary_extension()),
161         "--bind", grpc_core::LocalIpAndPort(port)};
162     if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds");
163     servers.emplace_back(server_flags);
164     gpr_log(GPR_INFO, "server started, pid %d", servers.back().GetPID());
165   }
166 
167   // Wait one second before starting client to avoid possible race condition
168   // of client sending an RPC before the server is set up
169   gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
170 
171   // start the client
172   gpr_log(GPR_INFO, "starting client");
173   std::vector<std::string> client_flags = {
174       absl::StrCat(root, "/memory_usage_callback_client",
175                    gpr_subprocess_binary_extension()),
176       "--target",
177       absl::GetFlag(FLAGS_use_xds)
178           ? absl::StrCat("xds:", XdsResourceUtils::kServerName)
179           : grpc_core::LocalIpAndPort(server_ports[0]),
180       "--nosecure", absl::StrCat("--size=", absl::GetFlag(FLAGS_size))};
181   if (server_ports.size() == 1) {
182     client_flags.emplace_back(
183         absl::StrCat("--server_pid=", servers[0].GetPID()));
184   }
185   Subprocess cli(client_flags);
186   gpr_log(GPR_INFO, "client started, pid %d", cli.GetPID());
187   // wait for completion
188   int retval = cli.Join();
189   if (retval != 0) {
190     printf("client failed with: %d", retval);
191     return 1;
192   }
193   for (auto& server : servers) {
194     server.Interrupt();
195     if (server.Join() != 0) retval = 2;
196   }
197   return retval;
198 }
199 
200 struct XdsServer {
201   std::shared_ptr<grpc::testing::AdsServiceImpl> ads_service;
202   std::unique_ptr<grpc::Server> server;
203 };
204 
StartXdsServerAndConfigureBootstrap(const std::vector<int> & server_ports)205 XdsServer StartXdsServerAndConfigureBootstrap(
206     const std::vector<int>& server_ports) {
207   XdsServer xds_server;
208   int xds_server_port = grpc_pick_unused_port_or_die();
209   gpr_log(GPR_INFO, "xDS server port: %d", xds_server_port);
210   // Generate xDS bootstrap and set the env var.
211   std::string bootstrap =
212       grpc::testing::XdsBootstrapBuilder()
213           .SetServers({absl::StrCat("localhost:", xds_server_port)})
214           .SetXdsChannelCredentials("insecure")
215           .Build();
216   grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP_CONFIG", bootstrap);
217   gpr_log(GPR_INFO, "xDS bootstrap: %s", bootstrap.c_str());
218   // Create ADS service.
219   xds_server.ads_service = std::make_shared<grpc::testing::AdsServiceImpl>();
220   xds_server.ads_service->Start();
221   // Populate xDS resources.
222   std::vector<XdsResourceUtils::EdsResourceArgs::Endpoint> endpoints;
223   endpoints.reserve(server_ports.size());
224   for (int port : server_ports) {
225     endpoints.emplace_back(port);
226     XdsResourceUtils::SetServerListenerNameAndRouteConfiguration(
227         xds_server.ads_service.get(), XdsResourceUtils::DefaultServerListener(),
228         port, XdsResourceUtils::DefaultServerRouteConfig());
229   }
230   XdsResourceUtils::SetListenerAndRouteConfiguration(
231       xds_server.ads_service.get(), XdsResourceUtils::DefaultListener(),
232       XdsResourceUtils::DefaultRouteConfig());
233   auto cluster = XdsResourceUtils::DefaultCluster();
234   cluster.mutable_circuit_breakers()
235       ->add_thresholds()
236       ->mutable_max_requests()
237       ->set_value(std::numeric_limits<uint32_t>::max());
238   xds_server.ads_service->SetCdsResource(cluster);
239   xds_server.ads_service->SetEdsResource(
240       XdsResourceUtils::BuildEdsResource(XdsResourceUtils::EdsResourceArgs(
241           {XdsResourceUtils::EdsResourceArgs::Locality(
242               "here", std::move(endpoints))})));
243   // Create and start server.
244   gpr_log(GPR_INFO, "starting xDS server...");
245   grpc::ServerBuilder builder;
246   builder.RegisterService(xds_server.ads_service.get());
247   builder.AddListeningPort(absl::StrCat("localhost:", xds_server_port),
248                            grpc::InsecureServerCredentials());
249   xds_server.server = builder.BuildAndStart();
250   gpr_log(GPR_INFO, "xDS server started");
251   return xds_server;
252 }
253 
RunBenchmark(char * root,absl::string_view benchmark,std::vector<std::string> server_scenario_flags,std::vector<std::string> client_scenario_flags)254 int RunBenchmark(char* root, absl::string_view benchmark,
255                  std::vector<std::string> server_scenario_flags,
256                  std::vector<std::string> client_scenario_flags) {
257   gpr_log(GPR_INFO, "running benchmark: %s", std::string(benchmark).c_str());
258   const size_t num_ports = benchmark == "channel_multi_address" ? 10 : 1;
259   std::vector<int> server_ports;
260   server_ports.reserve(num_ports);
261   for (size_t i = 0; i < num_ports; ++i) {
262     server_ports.push_back(grpc_pick_unused_port_or_die());
263   }
264   gpr_log(GPR_INFO, "server ports: %s",
265           absl::StrJoin(server_ports, ",").c_str());
266   XdsServer xds_server;
267   if (absl::GetFlag(FLAGS_use_xds)) {
268     xds_server = StartXdsServerAndConfigureBootstrap(server_ports);
269   }
270   int retval;
271   if (benchmark == "call") {
272     retval = RunCallBenchmark(server_ports[0], root, server_scenario_flags,
273                               client_scenario_flags);
274   } else if (benchmark == "channel" || benchmark == "channel_multi_address") {
275     retval = RunChannelBenchmark(server_ports, root);
276   } else {
277     gpr_log(GPR_INFO, "Not a valid benchmark name");
278     retval = 4;
279   }
280   if (xds_server.server != nullptr) xds_server.server->Shutdown();
281   gpr_log(GPR_INFO, "done running benchmark");
282   return retval;
283 }
284 
main(int argc,char ** argv)285 int main(int argc, char** argv) {
286   absl::ParseCommandLine(argc, argv);
287 
288   char* me = argv[0];
289   char* lslash = strrchr(me, '/');
290   char root[1024];
291 
292   std::vector<const char*> args;
293   // figure out where we are
294   if (lslash) {
295     memcpy(root, me, static_cast<size_t>(lslash - me));
296     root[lslash - me] = 0;
297   } else {
298     strcpy(root, ".");
299   }
300 
301   // Set configurations based off scenario_config
302   struct ScenarioArgs {
303     std::vector<std::string> client;
304     std::vector<std::string> server;
305   };
306   // TODO(chennancy): add in resource quota parameter setting later
307   const std::map<std::string /*scenario*/, ScenarioArgs> scenarios = {
308       {"secure", {/*client=*/{}, /*server=*/{"--secure"}}},
309       {"resource_quota", {/*client=*/{}, /*server=*/{"--secure"}}},
310       {"minstack", {/*client=*/{"--minstack"}, /*server=*/{"--minstack"}}},
311       {"insecure", {{}, {}}},
312   };
313   auto it_scenario = scenarios.find(absl::GetFlag(FLAGS_scenario_config));
314   if (it_scenario == scenarios.end()) {
315     printf("No scenario matching the name could be found\n");
316     return 3;
317   }
318 
319   // Run all benchmarks listed (Multiple benchmarks usually only for default
320   // scenario)
321   std::string benchmark_names = absl::GetFlag(FLAGS_benchmark_names);
322   if (benchmark_names.empty()) {
323     benchmark_names = absl::GetFlag(FLAGS_use_xds)
324                           ? "call,channel,channel_multi_address"
325                           : "call,channel";
326   }
327   auto benchmarks = absl::StrSplit(benchmark_names, ',');
328   grpc_init();
329   for (const auto& benchmark : benchmarks) {
330     int r = RunBenchmark(root, benchmark, it_scenario->second.server,
331                          it_scenario->second.client);
332     if (r != 0) return r;
333   }
334   grpc_shutdown();
335   return 0;
336 }
337