xref: /aosp_15_r20/external/perfetto/src/bigtrace/orchestrator/orchestrator_main.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <chrono>
18 #include <limits>
19 #include <memory>
20 #include <mutex>
21 
22 #include <grpcpp/channel.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/grpcpp.h>
25 #include <grpcpp/impl/service_type.h>
26 #include <grpcpp/security/credentials.h>
27 #include <grpcpp/support/channel_arguments.h>
28 #include <grpcpp/support/status.h>
29 
30 #include "perfetto/base/status.h"
31 #include "perfetto/ext/base/getopt.h"
32 #include "perfetto/ext/base/status_or.h"
33 #include "perfetto/ext/base/string_utils.h"
34 #include "protos/perfetto/bigtrace/worker.grpc.pb.h"
35 #include "src/bigtrace/orchestrator/orchestrator_impl.h"
36 #include "src/trace_processor/util/status_macros.h"
37 
38 namespace perfetto::bigtrace {
39 namespace {
40 
41 struct CommandLineOptions {
42   std::string server_socket;
43   std::string worker_address;
44   uint16_t worker_port = 0;
45   uint64_t worker_count = 0;
46   std::string worker_address_list;
47   std::string name_resolution_scheme;
48   uint32_t pool_size = 0;
49 };
50 
PrintUsage(char ** argv)51 void PrintUsage(char** argv) {
52   PERFETTO_ELOG(R"(
53 Orchestrator main executable.
54 Usage: %s [OPTIONS]
55 Options:
56  -h, --help                             Prints this guide.
57  -s, --server_socket ADDRESS:PORT       Input the socket for the
58                                         gRPC service to run on
59  -w, --worker_address ADDRESS           Input the address of the workers
60                                         (for single address and
61                                         incrementing port)
62  -p  --worker_port PORT                 Input the starting port of
63                                         the workers
64  -n  --worker_count NUM_WORKERS         Input the number of workers
65                                         (this will specify workers
66                                         starting from the worker_port
67                                         and counting up)
68  -l  --worker_list SOCKET1,SOCKET2,...  Input a string of comma a separated
69                                         worker sockets
70                                         (use either -l or
71                                          -w -p -n EXCLUSIVELY)
72  -r --name_resolution_scheme SCHEME     Specify the name resolution
73                                         scheme for gRPC (e.g. ipv4:, dns://)
74  -t -max_query_concurrency              Specify the number of concurrent
75   MAX_QUERY_CONCURRENCY                 queries/gRPCs from the Orchestrator
76                                         )",
77                 argv[0]);
78 }
79 
ParseCommandLineOptions(int argc,char ** argv)80 base::StatusOr<CommandLineOptions> ParseCommandLineOptions(int argc,
81                                                            char** argv) {
82   CommandLineOptions command_line_options;
83 
84   static option long_options[] = {
85       {"help", optional_argument, nullptr, 'h'},
86       {"server_socket", optional_argument, nullptr, 's'},
87       {"worker_address", optional_argument, nullptr, 'w'},
88       {"worker_port", optional_argument, nullptr, 'p'},
89       {"worker_count", optional_argument, nullptr, 'n'},
90       {"worker_list", optional_argument, nullptr, 'l'},
91       {"name_resolution_scheme", optional_argument, nullptr, 'r'},
92       {"thread_pool_size", optional_argument, nullptr, 't'},
93       {nullptr, 0, nullptr, 0}};
94   int c;
95   while ((c = getopt_long(argc, argv, "s:p:w:q:n:l:r:t:h", long_options,
96                           nullptr)) != -1) {
97     switch (c) {
98       case 's':
99         command_line_options.server_socket = optarg;
100         break;
101       case 'w':
102         command_line_options.worker_address = optarg;
103         break;
104       case 'p':
105         command_line_options.worker_port = static_cast<uint16_t>(atoi(optarg));
106         break;
107       case 'n':
108         command_line_options.worker_count = static_cast<uint64_t>(atoi(optarg));
109         break;
110       case 'l':
111         command_line_options.worker_address_list = optarg;
112         break;
113       case 'r':
114         command_line_options.name_resolution_scheme = optarg;
115         break;
116       case 't':
117         command_line_options.pool_size = static_cast<uint32_t>(atoi(optarg));
118         break;
119       default:
120         PrintUsage(argv);
121         exit(c == 'h' ? 0 : 1);
122     }
123   }
124 
125   bool has_worker_address_port_and_count =
126       command_line_options.worker_count && command_line_options.worker_port &&
127       !command_line_options.worker_address.empty();
128 
129   bool has_worker_list = !command_line_options.worker_address_list.empty();
130 
131   if (has_worker_address_port_and_count == has_worker_list) {
132     return base::ErrStatus(
133         "Error: You must specify a worker address, port and count OR a worker "
134         "list");
135   }
136 
137   if (command_line_options.worker_count <= 0 && !has_worker_list) {
138     return base::ErrStatus(
139         "Error: You must specify a worker count greater than 0 OR a worker "
140         "list");
141   }
142 
143   return command_line_options;
144 }
145 
OrchestratorMain(int argc,char ** argv)146 base::Status OrchestratorMain(int argc, char** argv) {
147   ASSIGN_OR_RETURN(base::StatusOr<CommandLineOptions> options,
148                    ParseCommandLineOptions(argc, argv));
149   std::string server_socket = options->server_socket.empty()
150                                   ? "127.0.0.1:5051"
151                                   : options->server_socket;
152   std::string worker_address =
153       options->worker_address.empty() ? "127.0.0.1" : options->worker_address;
154   uint16_t worker_port =
155       options->worker_port == 0 ? 5052 : options->worker_port;
156   std::string worker_address_list = options->worker_address_list;
157   uint64_t worker_count = options->worker_count;
158 
159   std::string target_address = options->name_resolution_scheme.empty()
160                                    ? "ipv4:"
161                                    : options->name_resolution_scheme;
162 
163   uint32_t pool_size = options->pool_size == 0
164                            ? std::thread::hardware_concurrency()
165                            : options->pool_size;
166 
167   PERFETTO_DCHECK(pool_size);
168 
169   if (worker_address_list.empty()) {
170     // Use a set of n workers incrementing from a starting port
171     PERFETTO_DCHECK(worker_count > 0 && !worker_address.empty());
172     std::vector<std::string> worker_addresses;
173     for (uint64_t i = 0; i < worker_count; ++i) {
174       std::string address =
175           worker_address + ":" + std::to_string(worker_port + i);
176       worker_addresses.push_back(address);
177     }
178     target_address += base::Join(worker_addresses, ",");
179   } else {
180     // Use a list of workers passed as an option
181     target_address += worker_address_list;
182   }
183   grpc::ChannelArguments args;
184   args.SetLoadBalancingPolicyName("round_robin");
185   args.SetMaxReceiveMessageSize(std::numeric_limits<int32_t>::max());
186   auto channel = grpc::CreateCustomChannel(
187       target_address, grpc::InsecureChannelCredentials(), args);
188   auto stub = protos::BigtraceWorker::NewStub(channel);
189   auto service = std::make_unique<OrchestratorImpl>(std::move(stub), pool_size);
190 
191   // Setup the Orchestrator Server
192   grpc::ServerBuilder builder;
193   builder.SetMaxReceiveMessageSize(std::numeric_limits<int32_t>::max());
194   builder.SetMaxMessageSize(std::numeric_limits<int32_t>::max());
195   builder.AddListeningPort(server_socket, grpc::InsecureServerCredentials());
196   builder.RegisterService(service.get());
197   std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
198   PERFETTO_LOG("Orchestrator server listening on %s", server_socket.c_str());
199 
200   server->Wait();
201 
202   return base::OkStatus();
203 }
204 
205 }  // namespace
206 }  // namespace perfetto::bigtrace
207 
main(int argc,char ** argv)208 int main(int argc, char** argv) {
209   auto status = perfetto::bigtrace::OrchestratorMain(argc, argv);
210   if (!status.ok()) {
211     fprintf(stderr, "%s\n", status.c_message());
212     return 1;
213   }
214   return 0;
215 }
216