1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/util/grpc_tool.h"
20
21 #include <cstdio>
22 #include <fstream>
23 #include <iostream>
24 #include <memory>
25 #include <sstream>
26 #include <string>
27 #include <thread>
28
29 #include "absl/flags/flag.h"
30 #include "absl/memory/memory.h"
31
32 #include <grpc/grpc.h>
33 #include <grpc/support/port_platform.h>
34 #include <grpcpp/channel.h>
35 #include <grpcpp/create_channel.h>
36 #include <grpcpp/grpcpp.h>
37 #include <grpcpp/security/credentials.h>
38 #include <grpcpp/support/string_ref.h>
39
40 #include "test/cpp/util/cli_call.h"
41 #include "test/cpp/util/proto_file_parser.h"
42 #include "test/cpp/util/proto_reflection_descriptor_database.h"
43 #include "test/cpp/util/service_describer.h"
44
45 #if GPR_WINDOWS
46 #include <io.h>
47 #else
48 #include <unistd.h>
49 #endif
50
51 ABSL_FLAG(bool, l, false, "Use a long listing format");
52 ABSL_FLAG(bool, remotedb, true,
53 "Use server types to parse and format messages");
54 ABSL_FLAG(std::string, metadata, "",
55 "Metadata to send to server, in the form of key1:val1:key2:val2");
56 ABSL_FLAG(std::string, proto_path, ".",
57 "Path to look for the proto file. "
58 "Multiple paths can be separated by " GRPC_CLI_PATH_SEPARATOR);
59 ABSL_FLAG(std::string, protofiles, "", "Name of the proto file.");
60 ABSL_FLAG(bool, binary_input, false, "Input in binary format");
61 ABSL_FLAG(bool, binary_output, false, "Output in binary format");
62 ABSL_FLAG(std::string, default_service_config, "",
63 "Default service config to use on the channel, if non-empty. Note "
64 "that this will be ignored if the name resolver returns a service "
65 "config.");
66 ABSL_FLAG(bool, display_peer_address, false,
67 "Log the peer socket address of the connection that each RPC is made "
68 "on to stderr.");
69 ABSL_FLAG(bool, json_input, false, "Input in json format");
70 ABSL_FLAG(bool, json_output, false, "Output in json format");
71 ABSL_FLAG(std::string, infile, "", "Input file (default is stdin)");
72 ABSL_FLAG(bool, batch, false,
73 "Input contains multiple requests. Please do not use this to send "
74 "more than a few RPCs. gRPC CLI has very different performance "
75 "characteristics compared with normal RPC calls which make it "
76 "unsuitable for loadtesting or significant production traffic.");
77 // TODO(Capstan): Consider using absl::Duration
78 ABSL_FLAG(double, timeout, -1,
79 "Specify timeout in seconds, used to set the deadline for all "
80 "RPCs. The default value of -1 means no deadline has been set.");
81 ABSL_FLAG(
82 int, max_recv_msg_size, 0,
83 "Specify the max receive message size in bytes for all RPCs. -1 indicates "
84 "unlimited. The default value of 0 means to use the gRPC default.");
85
86 namespace grpc {
87 namespace testing {
88 namespace {
89
90 class GrpcTool {
91 public:
92 explicit GrpcTool();
~GrpcTool()93 virtual ~GrpcTool() {}
94
95 bool Help(int argc, const char** argv, const CliCredentials& cred,
96 const GrpcToolOutputCallback& callback);
97 bool CallMethod(int argc, const char** argv, const CliCredentials& cred,
98 const GrpcToolOutputCallback& callback);
99 bool ListServices(int argc, const char** argv, const CliCredentials& cred,
100 const GrpcToolOutputCallback& callback);
101 bool PrintType(int argc, const char** argv, const CliCredentials& cred,
102 const GrpcToolOutputCallback& callback);
103 // TODO(zyc): implement the following methods
104 // bool ListServices(int argc, const char** argv, GrpcToolOutputCallback
105 // callback);
106 // bool PrintTypeId(int argc, const char** argv, GrpcToolOutputCallback
107 // callback);
108 bool ParseMessage(int argc, const char** argv, const CliCredentials& cred,
109 const GrpcToolOutputCallback& callback);
110 bool ToText(int argc, const char** argv, const CliCredentials& cred,
111 const GrpcToolOutputCallback& callback);
112 bool ToJson(int argc, const char** argv, const CliCredentials& cred,
113 const GrpcToolOutputCallback& callback);
114 bool ToBinary(int argc, const char** argv, const CliCredentials& cred,
115 const GrpcToolOutputCallback& callback);
116
SetPrintCommandMode(int exit_status)117 void SetPrintCommandMode(int exit_status) {
118 print_command_usage_ = true;
119 usage_exit_status_ = exit_status;
120 }
121
122 private:
123 void CommandUsage(const std::string& usage) const;
124 bool print_command_usage_;
125 int usage_exit_status_;
126 const std::string cred_usage_;
127 };
128
129 template <typename T>
130 std::function<bool(GrpcTool*, int, const char**, const CliCredentials&,
131 GrpcToolOutputCallback)>
BindWith5Args(T && func)132 BindWith5Args(T&& func) {
133 return std::bind(std::forward<T>(func), std::placeholders::_1,
134 std::placeholders::_2, std::placeholders::_3,
135 std::placeholders::_4, std::placeholders::_5);
136 }
137
138 template <typename T>
ArraySize(T & a)139 size_t ArraySize(T& a) {
140 return ((sizeof(a) / sizeof(*(a))) /
141 static_cast<size_t>(!(sizeof(a) % sizeof(*(a)))));
142 }
143
ParseMetadataFlag(std::multimap<std::string,std::string> * client_metadata)144 void ParseMetadataFlag(
145 std::multimap<std::string, std::string>* client_metadata) {
146 if (absl::GetFlag(FLAGS_metadata).empty()) {
147 return;
148 }
149 std::vector<std::string> fields;
150 const char delim = ':';
151 const char escape = '\\';
152 size_t cur = -1;
153 std::stringstream ss;
154 while (++cur < absl::GetFlag(FLAGS_metadata).length()) {
155 switch (absl::GetFlag(FLAGS_metadata).at(cur)) {
156 case escape:
157 if (cur < absl::GetFlag(FLAGS_metadata).length() - 1) {
158 char c = absl::GetFlag(FLAGS_metadata).at(++cur);
159 if (c == delim || c == escape) {
160 ss << c;
161 continue;
162 }
163 }
164 fprintf(stderr, "Failed to parse metadata flag.\n");
165 exit(1);
166 case delim:
167 fields.push_back(ss.str());
168 ss.str("");
169 ss.clear();
170 break;
171 default:
172 ss << absl::GetFlag(FLAGS_metadata).at(cur);
173 }
174 }
175 fields.push_back(ss.str());
176 if (fields.size() % 2) {
177 fprintf(stderr, "Failed to parse metadata flag.\n");
178 exit(1);
179 }
180 for (size_t i = 0; i < fields.size(); i += 2) {
181 client_metadata->insert(
182 std::pair<std::string, std::string>(fields[i], fields[i + 1]));
183 }
184 }
185
186 template <typename T>
PrintMetadata(const T & m,const std::string & message)187 void PrintMetadata(const T& m, const std::string& message) {
188 if (m.empty()) {
189 return;
190 }
191 fprintf(stderr, "%s\n", message.c_str());
192 std::string pair;
193 for (typename T::const_iterator iter = m.begin(); iter != m.end(); ++iter) {
194 pair.clear();
195 pair.append(iter->first.data(), iter->first.size());
196 pair.append(" : ");
197 pair.append(iter->second.data(), iter->second.size());
198 fprintf(stderr, "%s\n", pair.c_str());
199 }
200 }
201
ReadResponse(CliCall * call,const std::string & method_name,const GrpcToolOutputCallback & callback,ProtoFileParser * parser,gpr_mu * parser_mu,bool print_mode)202 void ReadResponse(CliCall* call, const std::string& method_name,
203 const GrpcToolOutputCallback& callback,
204 ProtoFileParser* parser, gpr_mu* parser_mu, bool print_mode) {
205 std::string serialized_response_proto;
206 std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata;
207
208 for (bool receive_initial_metadata = true; call->ReadAndMaybeNotifyWrite(
209 &serialized_response_proto,
210 receive_initial_metadata ? &server_initial_metadata : nullptr);
211 receive_initial_metadata = false) {
212 fprintf(stderr, "got response.\n");
213 if (!absl::GetFlag(FLAGS_binary_output)) {
214 gpr_mu_lock(parser_mu);
215 serialized_response_proto = parser->GetFormattedStringFromMethod(
216 method_name, serialized_response_proto, false /* is_request */,
217 absl::GetFlag(FLAGS_json_output));
218 if (parser->HasError() && print_mode) {
219 fprintf(stderr, "Failed to parse response.\n");
220 }
221 gpr_mu_unlock(parser_mu);
222 }
223 if (receive_initial_metadata) {
224 PrintMetadata(server_initial_metadata,
225 "Received initial metadata from server:");
226 }
227 if (!callback(serialized_response_proto) && print_mode) {
228 fprintf(stderr, "Failed to output response.\n");
229 }
230 }
231 }
232
CreateCliChannel(const std::string & server_address,const CliCredentials & cred,const grpc::ChannelArguments & extra_args)233 std::shared_ptr<grpc::Channel> CreateCliChannel(
234 const std::string& server_address, const CliCredentials& cred,
235 const grpc::ChannelArguments& extra_args) {
236 grpc::ChannelArguments args(extra_args);
237 if (!cred.GetSslTargetNameOverride().empty()) {
238 args.SetSslTargetNameOverride(cred.GetSslTargetNameOverride());
239 }
240 if (!absl::GetFlag(FLAGS_default_service_config).empty()) {
241 args.SetString(GRPC_ARG_SERVICE_CONFIG,
242 absl::GetFlag(FLAGS_default_service_config));
243 }
244 // See |GRPC_ARG_MAX_METADATA_SIZE| in |grpc_types.h|.
245 // Set to large enough size (10M) that should work for most use cases.
246 args.SetInt(GRPC_ARG_MAX_METADATA_SIZE, 10 * 1024 * 1024);
247 return grpc::CreateCustomChannel(server_address, cred.GetCredentials(), args);
248 }
249
250 struct Command {
251 const char* command;
252 std::function<bool(GrpcTool*, int, const char**, const CliCredentials&,
253 GrpcToolOutputCallback)>
254 function;
255 int min_args;
256 int max_args;
257 };
258
259 const Command ops[] = {
260 {"help", BindWith5Args(&GrpcTool::Help), 0, INT_MAX},
261 {"ls", BindWith5Args(&GrpcTool::ListServices), 1, 3},
262 {"list", BindWith5Args(&GrpcTool::ListServices), 1, 3},
263 {"call", BindWith5Args(&GrpcTool::CallMethod), 2, 3},
264 {"type", BindWith5Args(&GrpcTool::PrintType), 2, 2},
265 {"parse", BindWith5Args(&GrpcTool::ParseMessage), 2, 3},
266 {"totext", BindWith5Args(&GrpcTool::ToText), 2, 3},
267 {"tobinary", BindWith5Args(&GrpcTool::ToBinary), 2, 3},
268 {"tojson", BindWith5Args(&GrpcTool::ToJson), 2, 3},
269 };
270
Usage(const std::string & msg)271 void Usage(const std::string& msg) {
272 fprintf(
273 stderr,
274 "%s\n"
275 " grpc_cli ls ... ; List services\n"
276 " grpc_cli call ... ; Call method\n"
277 " grpc_cli type ... ; Print type\n"
278 " grpc_cli parse ... ; Parse message\n"
279 " grpc_cli totext ... ; Convert binary message to text\n"
280 " grpc_cli tojson ... ; Convert binary message to json\n"
281 " grpc_cli tobinary ... ; Convert text message to binary\n"
282 " grpc_cli help ... ; Print this message, or per-command usage\n"
283 "\n",
284 msg.c_str());
285
286 exit(1);
287 }
288
FindCommand(const std::string & name)289 const Command* FindCommand(const std::string& name) {
290 for (int i = 0; i < static_cast<int>(ArraySize(ops)); i++) {
291 if (name == ops[i].command) {
292 return &ops[i];
293 }
294 }
295 return nullptr;
296 }
297 } // namespace
298
GrpcToolMainLib(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)299 int GrpcToolMainLib(int argc, const char** argv, const CliCredentials& cred,
300 const GrpcToolOutputCallback& callback) {
301 if (argc < 2) {
302 Usage("No command specified");
303 }
304
305 std::string command = argv[1];
306 argc -= 2;
307 argv += 2;
308
309 const Command* cmd = FindCommand(command);
310 if (cmd != nullptr) {
311 GrpcTool grpc_tool;
312 if (argc < cmd->min_args || argc > cmd->max_args) {
313 // Force the command to print its usage message
314 fprintf(stderr, "\nWrong number of arguments for %s\n", command.c_str());
315 grpc_tool.SetPrintCommandMode(1);
316 return cmd->function(&grpc_tool, -1, nullptr, cred, callback);
317 }
318 const bool ok = cmd->function(&grpc_tool, argc, argv, cred, callback);
319 return ok ? 0 : 1;
320 } else {
321 Usage("Invalid command '" + command + "'");
322 }
323 return 1;
324 }
325
GrpcTool()326 GrpcTool::GrpcTool() : print_command_usage_(false), usage_exit_status_(0) {}
327
CommandUsage(const std::string & usage) const328 void GrpcTool::CommandUsage(const std::string& usage) const {
329 if (print_command_usage_) {
330 fprintf(stderr, "\n%s%s\n", usage.c_str(),
331 (usage.empty() || usage[usage.size() - 1] != '\n') ? "\n" : "");
332 exit(usage_exit_status_);
333 }
334 }
335
Help(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)336 bool GrpcTool::Help(int argc, const char** argv, const CliCredentials& cred,
337 const GrpcToolOutputCallback& callback) {
338 CommandUsage(
339 "Print help\n"
340 " grpc_cli help [subcommand]\n");
341
342 if (argc == 0) {
343 Usage("");
344 } else {
345 const Command* cmd = FindCommand(argv[0]);
346 if (cmd == nullptr) {
347 Usage("Unknown command '" + std::string(argv[0]) + "'");
348 }
349 SetPrintCommandMode(0);
350 cmd->function(this, -1, nullptr, cred, callback);
351 }
352 return true;
353 }
354
ListServices(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)355 bool GrpcTool::ListServices(int argc, const char** argv,
356 const CliCredentials& cred,
357 const GrpcToolOutputCallback& callback) {
358 CommandUsage(
359 "List services\n"
360 " grpc_cli ls <address> [<service>[/<method>]]\n"
361 " <address> ; host:port\n"
362 " <service> ; Exported service name\n"
363 " <method> ; Method name\n"
364 " --l ; Use a long listing format\n"
365 " --outfile ; Output filename (defaults to stdout)\n" +
366 cred.GetCredentialUsage());
367
368 std::string server_address(argv[0]);
369 std::shared_ptr<grpc::Channel> channel =
370 CreateCliChannel(server_address, cred, grpc::ChannelArguments());
371 grpc::ProtoReflectionDescriptorDatabase desc_db(channel);
372 grpc::protobuf::DescriptorPool desc_pool(&desc_db);
373
374 std::vector<std::string> service_list;
375 if (!desc_db.GetServices(&service_list)) {
376 fprintf(stderr, "Received an error when querying services endpoint.\n");
377 return false;
378 }
379
380 // If no service is specified, dump the list of services.
381 std::string output;
382 if (argc < 2) {
383 // List all services, if --l is passed, then include full description,
384 // otherwise include a summarized list only.
385 if (absl::GetFlag(FLAGS_l)) {
386 output = DescribeServiceList(service_list, desc_pool);
387 } else {
388 for (auto it = service_list.begin(); it != service_list.end(); it++) {
389 auto const& service = *it;
390 output.append(service);
391 output.append("\n");
392 }
393 }
394 } else {
395 std::string service_name;
396 std::string method_name;
397 std::stringstream ss(argv[1]);
398
399 // Remove leading slashes.
400 while (ss.peek() == '/') {
401 ss.get();
402 }
403
404 // Parse service and method names. Support the following patterns:
405 // Service
406 // Service Method
407 // Service.Method
408 // Service/Method
409 if (argc == 3) {
410 std::getline(ss, service_name, '/');
411 method_name = argv[2];
412 } else {
413 if (std::getline(ss, service_name, '/')) {
414 std::getline(ss, method_name);
415 }
416 }
417
418 const grpc::protobuf::ServiceDescriptor* service =
419 desc_pool.FindServiceByName(service_name);
420 if (service != nullptr) {
421 if (method_name.empty()) {
422 output = absl::GetFlag(FLAGS_l) ? DescribeService(service)
423 : SummarizeService(service);
424 } else {
425 method_name.insert(0, 1, '.');
426 method_name.insert(0, service_name);
427 const grpc::protobuf::MethodDescriptor* method =
428 desc_pool.FindMethodByName(method_name);
429 if (method != nullptr) {
430 output = absl::GetFlag(FLAGS_l) ? DescribeMethod(method)
431 : SummarizeMethod(method);
432 } else {
433 fprintf(stderr, "Method %s not found in service %s.\n",
434 method_name.c_str(), service_name.c_str());
435 return false;
436 }
437 }
438 } else {
439 if (!method_name.empty()) {
440 fprintf(stderr, "Service %s not found.\n", service_name.c_str());
441 return false;
442 } else {
443 const grpc::protobuf::MethodDescriptor* method =
444 desc_pool.FindMethodByName(service_name);
445 if (method != nullptr) {
446 output = absl::GetFlag(FLAGS_l) ? DescribeMethod(method)
447 : SummarizeMethod(method);
448 } else {
449 fprintf(stderr, "Service or method %s not found.\n",
450 service_name.c_str());
451 return false;
452 }
453 }
454 }
455 }
456 return callback(output);
457 }
458
PrintType(int,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)459 bool GrpcTool::PrintType(int /*argc*/, const char** argv,
460 const CliCredentials& cred,
461 const GrpcToolOutputCallback& callback) {
462 CommandUsage(
463 "Print type\n"
464 " grpc_cli type <address> <type>\n"
465 " <address> ; host:port\n"
466 " <type> ; Protocol buffer type name\n" +
467 cred.GetCredentialUsage());
468
469 std::string server_address(argv[0]);
470 std::shared_ptr<grpc::Channel> channel =
471 CreateCliChannel(server_address, cred, grpc::ChannelArguments());
472 grpc::ProtoReflectionDescriptorDatabase desc_db(channel);
473 grpc::protobuf::DescriptorPool desc_pool(&desc_db);
474
475 std::string output;
476 const grpc::protobuf::Descriptor* descriptor =
477 desc_pool.FindMessageTypeByName(argv[1]);
478 if (descriptor != nullptr) {
479 output = descriptor->DebugString();
480 } else {
481 fprintf(stderr, "Type %s not found.\n", argv[1]);
482 return false;
483 }
484 return callback(output);
485 }
486
CallMethod(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)487 bool GrpcTool::CallMethod(int argc, const char** argv,
488 const CliCredentials& cred,
489 const GrpcToolOutputCallback& callback) {
490 CommandUsage(
491 "Call method\n"
492 " grpc_cli call <address> <service>[.<method>] <request>\n"
493 " <address> ; host:port\n"
494 " <service> ; Exported service name\n"
495 " <method> ; Method name\n"
496 " <request> ; Text protobuffer (overrides infile)\n"
497 " --protofiles ; Comma separated proto files used as a"
498 " fallback when parsing request/response\n"
499 " --proto_path ; The search paths of proto files"
500 " (" GRPC_CLI_PATH_SEPARATOR
501 " separated), valid only when --protofiles is given\n"
502 " --noremotedb ; Don't attempt to use reflection service"
503 " at all\n"
504 " --metadata ; The metadata to be sent to the server\n"
505 " --infile ; Input filename (defaults to stdin)\n"
506 " --outfile ; Output filename (defaults to stdout)\n"
507 " --binary_input ; Input in binary format\n"
508 " --binary_output ; Output in binary format\n"
509 " --json_input ; Input in json format\n"
510 " --json_output ; Output in json format\n"
511 " --max_recv_msg_size ; Specify max receive message size in "
512 "bytes. -1 indicates unlimited. The default value of 0 means to use the "
513 "gRPC default.\n"
514 " --timeout ; Specify timeout (in seconds), used to "
515 "set the deadline for RPCs. The default value of -1 means no "
516 "deadline has been set.\n" +
517 cred.GetCredentialUsage());
518
519 std::stringstream output_ss;
520 std::string request_text;
521 std::string server_address(argv[0]);
522 std::string method_name(argv[1]);
523 std::string formatted_method_name;
524 std::unique_ptr<ProtoFileParser> parser;
525 std::string serialized_request_proto;
526 CliArgs cli_args;
527 cli_args.timeout = absl::GetFlag(FLAGS_timeout);
528 bool print_mode = false;
529
530 grpc::ChannelArguments args;
531 if (absl::GetFlag(FLAGS_max_recv_msg_size) != 0) {
532 args.SetMaxReceiveMessageSize(absl::GetFlag(FLAGS_max_recv_msg_size));
533 }
534 std::shared_ptr<grpc::Channel> channel =
535 CreateCliChannel(server_address, cred, args);
536
537 if (!absl::GetFlag(FLAGS_binary_input) ||
538 !absl::GetFlag(FLAGS_binary_output)) {
539 parser = std::make_unique<grpc::testing::ProtoFileParser>(
540 absl::GetFlag(FLAGS_remotedb) ? channel : nullptr,
541 absl::GetFlag(FLAGS_proto_path), absl::GetFlag(FLAGS_protofiles));
542 if (parser->HasError()) {
543 fprintf(
544 stderr,
545 "Failed to find remote reflection service and local proto files.\n");
546 return false;
547 }
548 }
549
550 if (absl::GetFlag(FLAGS_binary_input)) {
551 formatted_method_name = method_name;
552 } else {
553 formatted_method_name = parser->GetFormattedMethodName(method_name);
554 if (parser->HasError()) {
555 fprintf(stderr, "Failed to find method %s in proto files.\n",
556 method_name.c_str());
557 }
558 }
559
560 if (argc == 3) {
561 request_text = argv[2];
562 }
563
564 if (parser != nullptr &&
565 parser->IsStreaming(method_name, true /* is_request */)) {
566 std::istream* input_stream;
567 std::ifstream input_file;
568
569 if (absl::GetFlag(FLAGS_batch)) {
570 fprintf(stderr, "Batch mode for streaming RPC is not supported.\n");
571 return false;
572 }
573
574 std::multimap<std::string, std::string> client_metadata;
575 ParseMetadataFlag(&client_metadata);
576 PrintMetadata(client_metadata, "Sending client initial metadata:");
577
578 CliCall call(channel, formatted_method_name, client_metadata, cli_args);
579 if (absl::GetFlag(FLAGS_display_peer_address)) {
580 fprintf(stderr, "New call for method_name:%s has peer address:|%s|\n",
581 formatted_method_name.c_str(), call.peer().c_str());
582 }
583
584 if (absl::GetFlag(FLAGS_infile).empty()) {
585 if (isatty(fileno(stdin))) {
586 print_mode = true;
587 fprintf(stderr, "reading streaming request message from stdin...\n");
588 }
589 input_stream = &std::cin;
590 } else {
591 input_file.open(absl::GetFlag(FLAGS_infile),
592 std::ios::in | std::ios::binary);
593 if (!input_file) {
594 fprintf(stderr, "Failed to open infile %s.\n",
595 absl::GetFlag(FLAGS_infile).c_str());
596 return false;
597 }
598
599 input_stream = &input_file;
600 }
601
602 gpr_mu parser_mu;
603 gpr_mu_init(&parser_mu);
604 std::thread read_thread(ReadResponse, &call, method_name, callback,
605 parser.get(), &parser_mu, print_mode);
606
607 std::stringstream request_ss;
608 std::string line;
609 while (!request_text.empty() ||
610 (!input_stream->eof() && getline(*input_stream, line))) {
611 if (!request_text.empty()) {
612 if (absl::GetFlag(FLAGS_binary_input)) {
613 serialized_request_proto = request_text;
614 request_text.clear();
615 } else {
616 gpr_mu_lock(&parser_mu);
617 serialized_request_proto = parser->GetSerializedProtoFromMethod(
618 method_name, request_text, true /* is_request */,
619 absl::GetFlag(FLAGS_json_input));
620 request_text.clear();
621 if (parser->HasError()) {
622 if (print_mode) {
623 fprintf(stderr, "Failed to parse request.\n");
624 }
625 gpr_mu_unlock(&parser_mu);
626 continue;
627 }
628 gpr_mu_unlock(&parser_mu);
629 }
630
631 call.WriteAndWait(serialized_request_proto);
632 if (print_mode) {
633 fprintf(stderr, "Request sent.\n");
634 }
635 } else {
636 if (line.length() == 0) {
637 request_text = request_ss.str();
638 request_ss.str(std::string());
639 request_ss.clear();
640 } else {
641 request_ss << line << ' ';
642 }
643 }
644 }
645 if (input_file.is_open()) {
646 input_file.close();
647 }
648
649 call.WritesDoneAndWait();
650 read_thread.join();
651 gpr_mu_destroy(&parser_mu);
652
653 std::multimap<grpc::string_ref, grpc::string_ref> server_trailing_metadata;
654 Status status = call.Finish(&server_trailing_metadata);
655 PrintMetadata(server_trailing_metadata,
656 "Received trailing metadata from server:");
657
658 if (status.ok()) {
659 fprintf(stderr, "Stream RPC succeeded with OK status\n");
660 return true;
661 } else {
662 fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
663 status.error_code(), status.error_message().c_str());
664 return false;
665 }
666
667 } else { // parser->IsStreaming(method_name, true /* is_request */)
668 if (absl::GetFlag(FLAGS_batch)) {
669 if (parser != nullptr &&
670 parser->IsStreaming(method_name, false /* is_request */)) {
671 fprintf(stderr, "Batch mode for streaming RPC is not supported.\n");
672 return false;
673 }
674
675 std::istream* input_stream;
676 std::ifstream input_file;
677
678 if (absl::GetFlag(FLAGS_infile).empty()) {
679 if (isatty(fileno(stdin))) {
680 print_mode = true;
681 fprintf(stderr, "reading request messages from stdin...\n");
682 }
683 input_stream = &std::cin;
684 } else {
685 input_file.open(absl::GetFlag(FLAGS_infile),
686 std::ios::in | std::ios::binary);
687 input_stream = &input_file;
688 }
689
690 std::multimap<std::string, std::string> client_metadata;
691 ParseMetadataFlag(&client_metadata);
692 if (print_mode) {
693 PrintMetadata(client_metadata, "Sending client initial metadata:");
694 }
695
696 std::stringstream request_ss;
697 std::string line;
698 while (!request_text.empty() ||
699 (!input_stream->eof() && getline(*input_stream, line))) {
700 if (!request_text.empty()) {
701 if (absl::GetFlag(FLAGS_binary_input)) {
702 serialized_request_proto = request_text;
703 request_text.clear();
704 } else {
705 serialized_request_proto = parser->GetSerializedProtoFromMethod(
706 method_name, request_text, true /* is_request */,
707 absl::GetFlag(FLAGS_json_input));
708 request_text.clear();
709 if (parser->HasError()) {
710 if (print_mode) {
711 fprintf(stderr, "Failed to parse request.\n");
712 }
713 continue;
714 }
715 }
716
717 std::string serialized_response_proto;
718 std::multimap<grpc::string_ref, grpc::string_ref>
719 server_initial_metadata, server_trailing_metadata;
720 CliCall call(channel, formatted_method_name, client_metadata,
721 cli_args);
722 if (absl::GetFlag(FLAGS_display_peer_address)) {
723 fprintf(stderr,
724 "New call for method_name:%s has peer address:|%s|\n",
725 formatted_method_name.c_str(), call.peer().c_str());
726 }
727 call.Write(serialized_request_proto);
728 call.WritesDone();
729 if (!call.Read(&serialized_response_proto,
730 &server_initial_metadata)) {
731 fprintf(stderr, "Failed to read response.\n");
732 }
733 Status status = call.Finish(&server_trailing_metadata);
734
735 if (status.ok()) {
736 if (print_mode) {
737 fprintf(stderr, "Rpc succeeded with OK status.\n");
738 PrintMetadata(server_initial_metadata,
739 "Received initial metadata from server:");
740 PrintMetadata(server_trailing_metadata,
741 "Received trailing metadata from server:");
742 }
743
744 if (absl::GetFlag(FLAGS_binary_output)) {
745 if (!callback(serialized_response_proto)) {
746 break;
747 }
748 } else {
749 std::string response_text = parser->GetFormattedStringFromMethod(
750 method_name, serialized_response_proto,
751 false /* is_request */, absl::GetFlag(FLAGS_json_output));
752
753 if (parser->HasError() && print_mode) {
754 fprintf(stderr, "Failed to parse response.\n");
755 } else {
756 if (!callback(response_text)) {
757 break;
758 }
759 }
760 }
761 } else {
762 if (print_mode) {
763 fprintf(stderr,
764 "Rpc failed with status code %d, error message: %s\n",
765 status.error_code(), status.error_message().c_str());
766 }
767 }
768 } else {
769 if (line.length() == 0) {
770 request_text = request_ss.str();
771 request_ss.str(std::string());
772 request_ss.clear();
773 } else {
774 request_ss << line << ' ';
775 }
776 }
777 }
778
779 if (input_file.is_open()) {
780 input_file.close();
781 }
782
783 return true;
784 }
785
786 if (argc == 3) {
787 if (!absl::GetFlag(FLAGS_infile).empty()) {
788 fprintf(stderr, "warning: request given in argv, ignoring --infile\n");
789 }
790 } else {
791 std::stringstream input_stream;
792 if (absl::GetFlag(FLAGS_infile).empty()) {
793 if (isatty(fileno(stdin))) {
794 fprintf(stderr, "reading request message from stdin...\n");
795 }
796 input_stream << std::cin.rdbuf();
797 } else {
798 std::ifstream input_file(absl::GetFlag(FLAGS_infile),
799 std::ios::in | std::ios::binary);
800 input_stream << input_file.rdbuf();
801 input_file.close();
802 }
803 request_text = input_stream.str();
804 }
805
806 if (absl::GetFlag(FLAGS_binary_input)) {
807 serialized_request_proto = request_text;
808 } else {
809 serialized_request_proto = parser->GetSerializedProtoFromMethod(
810 method_name, request_text, true /* is_request */,
811 absl::GetFlag(FLAGS_json_input));
812 if (parser->HasError()) {
813 fprintf(stderr, "Failed to parse request.\n");
814 return false;
815 }
816 }
817 fprintf(stderr, "connecting to %s\n", server_address.c_str());
818
819 std::string serialized_response_proto;
820 std::multimap<std::string, std::string> client_metadata;
821 std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
822 server_trailing_metadata;
823 ParseMetadataFlag(&client_metadata);
824 PrintMetadata(client_metadata, "Sending client initial metadata:");
825
826 CliCall call(channel, formatted_method_name, client_metadata, cli_args);
827 if (absl::GetFlag(FLAGS_display_peer_address)) {
828 fprintf(stderr, "New call for method_name:%s has peer address:|%s|\n",
829 formatted_method_name.c_str(), call.peer().c_str());
830 }
831 call.Write(serialized_request_proto);
832 call.WritesDone();
833
834 for (bool receive_initial_metadata = true; call.Read(
835 &serialized_response_proto,
836 receive_initial_metadata ? &server_initial_metadata : nullptr);
837 receive_initial_metadata = false) {
838 if (!absl::GetFlag(FLAGS_binary_output)) {
839 serialized_response_proto = parser->GetFormattedStringFromMethod(
840 method_name, serialized_response_proto, false /* is_request */,
841 absl::GetFlag(FLAGS_json_output));
842 if (parser->HasError()) {
843 fprintf(stderr, "Failed to parse response.\n");
844 return false;
845 }
846 }
847
848 if (receive_initial_metadata) {
849 PrintMetadata(server_initial_metadata,
850 "Received initial metadata from server:");
851 }
852 if (!callback(serialized_response_proto)) {
853 return false;
854 }
855 }
856 Status status = call.Finish(&server_trailing_metadata);
857 PrintMetadata(server_trailing_metadata,
858 "Received trailing metadata from server:");
859 if (status.ok()) {
860 fprintf(stderr, "Rpc succeeded with OK status\n");
861 return true;
862 } else {
863 fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
864 status.error_code(), status.error_message().c_str());
865 return false;
866 }
867 }
868 GPR_UNREACHABLE_CODE(return false);
869 }
870
ParseMessage(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)871 bool GrpcTool::ParseMessage(int argc, const char** argv,
872 const CliCredentials& cred,
873 const GrpcToolOutputCallback& callback) {
874 CommandUsage(
875 "Parse message\n"
876 " grpc_cli parse <address> <type> [<message>]\n"
877 " <address> ; host:port\n"
878 " <type> ; Protocol buffer type name\n"
879 " <message> ; Text protobuffer (overrides --infile)\n"
880 " --protofiles ; Comma separated proto files used as a"
881 " fallback when parsing request/response\n"
882 " --proto_path ; The search paths of proto files"
883 " (" GRPC_CLI_PATH_SEPARATOR
884 " separated), valid only when --protofiles is given\n"
885 " --noremotedb ; Don't attempt to use reflection service"
886 " at all\n"
887 " --infile ; Input filename (defaults to stdin)\n"
888 " --outfile ; Output filename (defaults to stdout)\n"
889 " --binary_input ; Input in binary format\n"
890 " --binary_output ; Output in binary format\n"
891 " --json_input ; Input in json format\n"
892 " --json_output ; Output in json format\n" +
893 cred.GetCredentialUsage());
894
895 std::stringstream output_ss;
896 std::string message_text;
897 std::string server_address(argv[0]);
898 std::string type_name(argv[1]);
899 std::unique_ptr<grpc::testing::ProtoFileParser> parser;
900 std::string serialized_request_proto;
901
902 if (argc == 3) {
903 message_text = argv[2];
904 if (!absl::GetFlag(FLAGS_infile).empty()) {
905 fprintf(stderr, "warning: message given in argv, ignoring --infile.\n");
906 }
907 } else {
908 std::stringstream input_stream;
909 if (absl::GetFlag(FLAGS_infile).empty()) {
910 if (isatty(fileno(stdin))) {
911 fprintf(stderr, "reading request message from stdin...\n");
912 }
913 input_stream << std::cin.rdbuf();
914 } else {
915 std::ifstream input_file(absl::GetFlag(FLAGS_infile),
916 std::ios::in | std::ios::binary);
917 input_stream << input_file.rdbuf();
918 input_file.close();
919 }
920 message_text = input_stream.str();
921 }
922
923 if (!absl::GetFlag(FLAGS_binary_input) ||
924 !absl::GetFlag(FLAGS_binary_output)) {
925 std::shared_ptr<grpc::Channel> channel =
926 CreateCliChannel(server_address, cred, grpc::ChannelArguments());
927 parser = std::make_unique<grpc::testing::ProtoFileParser>(
928 absl::GetFlag(FLAGS_remotedb) ? channel : nullptr,
929 absl::GetFlag(FLAGS_proto_path), absl::GetFlag(FLAGS_protofiles));
930 if (parser->HasError()) {
931 fprintf(
932 stderr,
933 "Failed to find remote reflection service and local proto files.\n");
934 return false;
935 }
936 }
937
938 if (absl::GetFlag(FLAGS_binary_input)) {
939 serialized_request_proto = message_text;
940 } else {
941 serialized_request_proto = parser->GetSerializedProtoFromMessageType(
942 type_name, message_text, absl::GetFlag(FLAGS_json_input));
943 if (parser->HasError()) {
944 fprintf(stderr, "Failed to serialize the message.\n");
945 return false;
946 }
947 }
948
949 if (absl::GetFlag(FLAGS_binary_output)) {
950 output_ss << serialized_request_proto;
951 } else {
952 std::string output_text;
953 output_text = parser->GetFormattedStringFromMessageType(
954 type_name, serialized_request_proto, absl::GetFlag(FLAGS_json_output));
955 if (parser->HasError()) {
956 fprintf(stderr, "Failed to deserialize the message.\n");
957 return false;
958 }
959
960 output_ss << output_text << std::endl;
961 }
962
963 return callback(output_ss.str());
964 }
965
ToText(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)966 bool GrpcTool::ToText(int argc, const char** argv, const CliCredentials& cred,
967 const GrpcToolOutputCallback& callback) {
968 CommandUsage(
969 "Convert binary message to text\n"
970 " grpc_cli totext <protofiles> <type>\n"
971 " <protofiles> ; Comma separated list of proto files\n"
972 " <type> ; Protocol buffer type name\n"
973 " --proto_path ; The search paths of proto files"
974 " (" GRPC_CLI_PATH_SEPARATOR
975 " separated)\n"
976 " --infile ; Input filename (defaults to stdin)\n"
977 " --outfile ; Output filename (defaults to stdout)\n");
978
979 absl::SetFlag(&FLAGS_protofiles, argv[0]);
980 absl::SetFlag(&FLAGS_remotedb, false);
981 absl::SetFlag(&FLAGS_binary_input, true);
982 absl::SetFlag(&FLAGS_binary_output, false);
983 return ParseMessage(argc, argv, cred, callback);
984 }
985
ToJson(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)986 bool GrpcTool::ToJson(int argc, const char** argv, const CliCredentials& cred,
987 const GrpcToolOutputCallback& callback) {
988 CommandUsage(
989 "Convert binary message to json\n"
990 " grpc_cli tojson <protofiles> <type>\n"
991 " <protofiles> ; Comma separated list of proto files\n"
992 " <type> ; Protocol buffer type name\n"
993 " --proto_path ; The search paths of proto files"
994 " (" GRPC_CLI_PATH_SEPARATOR
995 " separated)\n"
996 " --infile ; Input filename (defaults to stdin)\n"
997 " --outfile ; Output filename (defaults to stdout)\n");
998
999 absl::SetFlag(&FLAGS_protofiles, argv[0]);
1000 absl::SetFlag(&FLAGS_remotedb, false);
1001 absl::SetFlag(&FLAGS_binary_input, true);
1002 absl::SetFlag(&FLAGS_binary_output, false);
1003 absl::SetFlag(&FLAGS_json_output, true);
1004 return ParseMessage(argc, argv, cred, callback);
1005 }
1006
ToBinary(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)1007 bool GrpcTool::ToBinary(int argc, const char** argv, const CliCredentials& cred,
1008 const GrpcToolOutputCallback& callback) {
1009 CommandUsage(
1010 "Convert text message to binary\n"
1011 " grpc_cli tobinary <protofiles> <type> [<message>]\n"
1012 " <protofiles> ; Comma separated list of proto files\n"
1013 " <type> ; Protocol buffer type name\n"
1014 " --proto_path ; The search paths of proto files"
1015 " (" GRPC_CLI_PATH_SEPARATOR
1016 " separated)\n"
1017 " --infile ; Input filename (defaults to stdin)\n"
1018 " --outfile ; Output filename (defaults to stdout)\n");
1019
1020 absl::SetFlag(&FLAGS_protofiles, argv[0]);
1021 absl::SetFlag(&FLAGS_remotedb, false);
1022 absl::SetFlag(&FLAGS_binary_input, false);
1023 absl::SetFlag(&FLAGS_binary_output, true);
1024 return ParseMessage(argc, argv, cred, callback);
1025 }
1026
1027 } // namespace testing
1028 } // namespace grpc
1029