1 /*
2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <chrono>
18 #include <limits>
19 #include <memory>
20 #include <mutex>
21
22 #include <grpcpp/channel.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/grpcpp.h>
25 #include <grpcpp/impl/service_type.h>
26 #include <grpcpp/security/credentials.h>
27 #include <grpcpp/support/channel_arguments.h>
28 #include <grpcpp/support/status.h>
29
30 #include "perfetto/base/status.h"
31 #include "perfetto/ext/base/getopt.h"
32 #include "perfetto/ext/base/status_or.h"
33 #include "perfetto/ext/base/string_utils.h"
34 #include "protos/perfetto/bigtrace/worker.grpc.pb.h"
35 #include "src/bigtrace/orchestrator/orchestrator_impl.h"
36 #include "src/trace_processor/util/status_macros.h"
37
38 namespace perfetto::bigtrace {
39 namespace {
40
41 struct CommandLineOptions {
42 std::string server_socket;
43 std::string worker_address;
44 uint16_t worker_port = 0;
45 uint64_t worker_count = 0;
46 std::string worker_address_list;
47 std::string name_resolution_scheme;
48 uint32_t pool_size = 0;
49 };
50
PrintUsage(char ** argv)51 void PrintUsage(char** argv) {
52 PERFETTO_ELOG(R"(
53 Orchestrator main executable.
54 Usage: %s [OPTIONS]
55 Options:
56 -h, --help Prints this guide.
57 -s, --server_socket ADDRESS:PORT Input the socket for the
58 gRPC service to run on
59 -w, --worker_address ADDRESS Input the address of the workers
60 (for single address and
61 incrementing port)
62 -p --worker_port PORT Input the starting port of
63 the workers
64 -n --worker_count NUM_WORKERS Input the number of workers
65 (this will specify workers
66 starting from the worker_port
67 and counting up)
68 -l --worker_list SOCKET1,SOCKET2,... Input a string of comma a separated
69 worker sockets
70 (use either -l or
71 -w -p -n EXCLUSIVELY)
72 -r --name_resolution_scheme SCHEME Specify the name resolution
73 scheme for gRPC (e.g. ipv4:, dns://)
74 -t -max_query_concurrency Specify the number of concurrent
75 MAX_QUERY_CONCURRENCY queries/gRPCs from the Orchestrator
76 )",
77 argv[0]);
78 }
79
ParseCommandLineOptions(int argc,char ** argv)80 base::StatusOr<CommandLineOptions> ParseCommandLineOptions(int argc,
81 char** argv) {
82 CommandLineOptions command_line_options;
83
84 static option long_options[] = {
85 {"help", optional_argument, nullptr, 'h'},
86 {"server_socket", optional_argument, nullptr, 's'},
87 {"worker_address", optional_argument, nullptr, 'w'},
88 {"worker_port", optional_argument, nullptr, 'p'},
89 {"worker_count", optional_argument, nullptr, 'n'},
90 {"worker_list", optional_argument, nullptr, 'l'},
91 {"name_resolution_scheme", optional_argument, nullptr, 'r'},
92 {"thread_pool_size", optional_argument, nullptr, 't'},
93 {nullptr, 0, nullptr, 0}};
94 int c;
95 while ((c = getopt_long(argc, argv, "s:p:w:q:n:l:r:t:h", long_options,
96 nullptr)) != -1) {
97 switch (c) {
98 case 's':
99 command_line_options.server_socket = optarg;
100 break;
101 case 'w':
102 command_line_options.worker_address = optarg;
103 break;
104 case 'p':
105 command_line_options.worker_port = static_cast<uint16_t>(atoi(optarg));
106 break;
107 case 'n':
108 command_line_options.worker_count = static_cast<uint64_t>(atoi(optarg));
109 break;
110 case 'l':
111 command_line_options.worker_address_list = optarg;
112 break;
113 case 'r':
114 command_line_options.name_resolution_scheme = optarg;
115 break;
116 case 't':
117 command_line_options.pool_size = static_cast<uint32_t>(atoi(optarg));
118 break;
119 default:
120 PrintUsage(argv);
121 exit(c == 'h' ? 0 : 1);
122 }
123 }
124
125 bool has_worker_address_port_and_count =
126 command_line_options.worker_count && command_line_options.worker_port &&
127 !command_line_options.worker_address.empty();
128
129 bool has_worker_list = !command_line_options.worker_address_list.empty();
130
131 if (has_worker_address_port_and_count == has_worker_list) {
132 return base::ErrStatus(
133 "Error: You must specify a worker address, port and count OR a worker "
134 "list");
135 }
136
137 if (command_line_options.worker_count <= 0 && !has_worker_list) {
138 return base::ErrStatus(
139 "Error: You must specify a worker count greater than 0 OR a worker "
140 "list");
141 }
142
143 return command_line_options;
144 }
145
OrchestratorMain(int argc,char ** argv)146 base::Status OrchestratorMain(int argc, char** argv) {
147 ASSIGN_OR_RETURN(base::StatusOr<CommandLineOptions> options,
148 ParseCommandLineOptions(argc, argv));
149 std::string server_socket = options->server_socket.empty()
150 ? "127.0.0.1:5051"
151 : options->server_socket;
152 std::string worker_address =
153 options->worker_address.empty() ? "127.0.0.1" : options->worker_address;
154 uint16_t worker_port =
155 options->worker_port == 0 ? 5052 : options->worker_port;
156 std::string worker_address_list = options->worker_address_list;
157 uint64_t worker_count = options->worker_count;
158
159 std::string target_address = options->name_resolution_scheme.empty()
160 ? "ipv4:"
161 : options->name_resolution_scheme;
162
163 uint32_t pool_size = options->pool_size == 0
164 ? std::thread::hardware_concurrency()
165 : options->pool_size;
166
167 PERFETTO_DCHECK(pool_size);
168
169 if (worker_address_list.empty()) {
170 // Use a set of n workers incrementing from a starting port
171 PERFETTO_DCHECK(worker_count > 0 && !worker_address.empty());
172 std::vector<std::string> worker_addresses;
173 for (uint64_t i = 0; i < worker_count; ++i) {
174 std::string address =
175 worker_address + ":" + std::to_string(worker_port + i);
176 worker_addresses.push_back(address);
177 }
178 target_address += base::Join(worker_addresses, ",");
179 } else {
180 // Use a list of workers passed as an option
181 target_address += worker_address_list;
182 }
183 grpc::ChannelArguments args;
184 args.SetLoadBalancingPolicyName("round_robin");
185 args.SetMaxReceiveMessageSize(std::numeric_limits<int32_t>::max());
186 auto channel = grpc::CreateCustomChannel(
187 target_address, grpc::InsecureChannelCredentials(), args);
188 auto stub = protos::BigtraceWorker::NewStub(channel);
189 auto service = std::make_unique<OrchestratorImpl>(std::move(stub), pool_size);
190
191 // Setup the Orchestrator Server
192 grpc::ServerBuilder builder;
193 builder.SetMaxReceiveMessageSize(std::numeric_limits<int32_t>::max());
194 builder.SetMaxMessageSize(std::numeric_limits<int32_t>::max());
195 builder.AddListeningPort(server_socket, grpc::InsecureServerCredentials());
196 builder.RegisterService(service.get());
197 std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
198 PERFETTO_LOG("Orchestrator server listening on %s", server_socket.c_str());
199
200 server->Wait();
201
202 return base::OkStatus();
203 }
204
205 } // namespace
206 } // namespace perfetto::bigtrace
207
main(int argc,char ** argv)208 int main(int argc, char** argv) {
209 auto status = perfetto::bigtrace::OrchestratorMain(argc, argv);
210 if (!status.ok()) {
211 fprintf(stderr, "%s\n", status.c_message());
212 return 1;
213 }
214 return 0;
215 }
216