xref: /aosp_15_r20/external/perfetto/src/bigtrace/worker/worker_impl.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <mutex>
18 #include <thread>
19 
20 #include <grpcpp/server_context.h>
21 #include <grpcpp/support/status.h>
22 
23 #include "perfetto/base/time.h"
24 #include "perfetto/ext/trace_processor/rpc/query_result_serializer.h"
25 #include "perfetto/trace_processor/trace_processor.h"
26 #include "src/bigtrace/worker/worker_impl.h"
27 
28 namespace perfetto::bigtrace {
29 
QueryTrace(grpc::ServerContext * server_context,const protos::BigtraceQueryTraceArgs * args,protos::BigtraceQueryTraceResponse * response)30 grpc::Status WorkerImpl::QueryTrace(
31     grpc::ServerContext* server_context,
32     const protos::BigtraceQueryTraceArgs* args,
33     protos::BigtraceQueryTraceResponse* response) {
34   std::mutex mutex;
35   bool is_thread_done = false;
36 
37   std::string args_trace = args->trace();
38 
39   if (args_trace.empty()) {
40     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
41                         "Empty trace name is not valid");
42   }
43 
44   if (args_trace[0] != '/') {
45     return grpc::Status(
46         grpc::StatusCode::INVALID_ARGUMENT,
47         "Trace path must contain and begin with / for the prefix");
48   }
49 
50   std::string prefix = args_trace.substr(0, args_trace.find("/", 1));
51   if (registry_.find(prefix) == registry_.end()) {
52     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
53                         "Path prefix does not exist in registry");
54   }
55 
56   if (prefix.length() == args_trace.length()) {
57     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
58                         "Empty path is invalid");
59   }
60 
61   std::string path = args_trace.substr(prefix.length() + 1);
62 
63   base::StatusOr<std::unique_ptr<trace_processor::TraceProcessor>> tp_or =
64       registry_[prefix]->LoadTraceProcessor(path);
65 
66   if (!tp_or.ok()) {
67     const std::string& error_message = tp_or.status().message();
68     return grpc::Status(grpc::StatusCode::INTERNAL, error_message);
69   }
70 
71   std::unique_ptr<trace_processor::TraceProcessor> tp = std::move(*tp_or);
72   std::optional<trace_processor::Iterator> iterator;
73 
74   std::thread execute_query_thread([&]() {
75     iterator = tp->ExecuteQuery(args->sql_query());
76     std::lock_guard<std::mutex> lk(mutex);
77     is_thread_done = true;
78   });
79 
80   for (;;) {
81     if (server_context->IsCancelled()) {
82       // If the thread is cancelled, we need to propagate the information to the
83       // trace processor thread and we do this by attempting to interrupt the
84       // trace processor every 10ms until the trace processor thread returns.
85       //
86       // A loop is necessary here because, due to scheduling delay, it is
87       // possible we are cancelled before trace processor even started running.
88       // InterruptQuery is ignored if it happens before entering TraceProcessor
89       // which can cause the query to not be interrupted at all.
90       while (!execute_query_thread.joinable()) {
91         base::SleepMicroseconds(10000);
92         tp->InterruptQuery();
93       }
94       execute_query_thread.join();
95       return grpc::Status::CANCELLED;
96     }
97 
98     std::lock_guard<std::mutex> lk(mutex);
99     if (is_thread_done) {
100       execute_query_thread.join();
101       trace_processor::QueryResultSerializer serializer =
102           trace_processor::QueryResultSerializer(*std::move(iterator));
103 
104       std::vector<uint8_t> serialized;
105       for (bool has_more = true; has_more;) {
106         serialized.clear();
107         has_more = serializer.Serialize(&serialized);
108         response->add_result()->ParseFromArray(
109             serialized.data(), static_cast<int>(serialized.size()));
110       }
111       response->set_trace(args->trace());
112       return grpc::Status::OK;
113     }
114   }
115 }
116 
117 }  // namespace perfetto::bigtrace
118