1 /*
2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <mutex>
18 #include <thread>
19
20 #include <grpcpp/server_context.h>
21 #include <grpcpp/support/status.h>
22
23 #include "perfetto/base/time.h"
24 #include "perfetto/ext/trace_processor/rpc/query_result_serializer.h"
25 #include "perfetto/trace_processor/trace_processor.h"
26 #include "src/bigtrace/worker/worker_impl.h"
27
28 namespace perfetto::bigtrace {
29
QueryTrace(grpc::ServerContext * server_context,const protos::BigtraceQueryTraceArgs * args,protos::BigtraceQueryTraceResponse * response)30 grpc::Status WorkerImpl::QueryTrace(
31 grpc::ServerContext* server_context,
32 const protos::BigtraceQueryTraceArgs* args,
33 protos::BigtraceQueryTraceResponse* response) {
34 std::mutex mutex;
35 bool is_thread_done = false;
36
37 std::string args_trace = args->trace();
38
39 if (args_trace.empty()) {
40 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
41 "Empty trace name is not valid");
42 }
43
44 if (args_trace[0] != '/') {
45 return grpc::Status(
46 grpc::StatusCode::INVALID_ARGUMENT,
47 "Trace path must contain and begin with / for the prefix");
48 }
49
50 std::string prefix = args_trace.substr(0, args_trace.find("/", 1));
51 if (registry_.find(prefix) == registry_.end()) {
52 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
53 "Path prefix does not exist in registry");
54 }
55
56 if (prefix.length() == args_trace.length()) {
57 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
58 "Empty path is invalid");
59 }
60
61 std::string path = args_trace.substr(prefix.length() + 1);
62
63 base::StatusOr<std::unique_ptr<trace_processor::TraceProcessor>> tp_or =
64 registry_[prefix]->LoadTraceProcessor(path);
65
66 if (!tp_or.ok()) {
67 const std::string& error_message = tp_or.status().message();
68 return grpc::Status(grpc::StatusCode::INTERNAL, error_message);
69 }
70
71 std::unique_ptr<trace_processor::TraceProcessor> tp = std::move(*tp_or);
72 std::optional<trace_processor::Iterator> iterator;
73
74 std::thread execute_query_thread([&]() {
75 iterator = tp->ExecuteQuery(args->sql_query());
76 std::lock_guard<std::mutex> lk(mutex);
77 is_thread_done = true;
78 });
79
80 for (;;) {
81 if (server_context->IsCancelled()) {
82 // If the thread is cancelled, we need to propagate the information to the
83 // trace processor thread and we do this by attempting to interrupt the
84 // trace processor every 10ms until the trace processor thread returns.
85 //
86 // A loop is necessary here because, due to scheduling delay, it is
87 // possible we are cancelled before trace processor even started running.
88 // InterruptQuery is ignored if it happens before entering TraceProcessor
89 // which can cause the query to not be interrupted at all.
90 while (!execute_query_thread.joinable()) {
91 base::SleepMicroseconds(10000);
92 tp->InterruptQuery();
93 }
94 execute_query_thread.join();
95 return grpc::Status::CANCELLED;
96 }
97
98 std::lock_guard<std::mutex> lk(mutex);
99 if (is_thread_done) {
100 execute_query_thread.join();
101 trace_processor::QueryResultSerializer serializer =
102 trace_processor::QueryResultSerializer(*std::move(iterator));
103
104 std::vector<uint8_t> serialized;
105 for (bool has_more = true; has_more;) {
106 serialized.clear();
107 has_more = serializer.Serialize(&serialized);
108 response->add_result()->ParseFromArray(
109 serialized.data(), static_cast<int>(serialized.size()));
110 }
111 response->set_trace(args->trace());
112 return grpc::Status::OK;
113 }
114 }
115 }
116
117 } // namespace perfetto::bigtrace
118