xref: /aosp_15_r20/external/perfetto/src/bigtrace/orchestrator/orchestrator_impl.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <chrono>
18 #include <memory>
19 #include <mutex>
20 #include <thread>
21 
22 #include <grpcpp/client_context.h>
23 #include <grpcpp/support/status.h>
24 
25 #include "perfetto/base/logging.h"
26 #include "perfetto/base/time.h"
27 #include "perfetto/ext/base/utils.h"
28 #include "protos/perfetto/bigtrace/orchestrator.pb.h"
29 #include "src/bigtrace/orchestrator/orchestrator_impl.h"
30 #include "src/bigtrace/orchestrator/resizable_task_pool.h"
31 #include "src/bigtrace/orchestrator/trace_address_pool.h"
32 
33 namespace perfetto::bigtrace {
34 namespace {
35 const uint32_t kBufferPushDelayMicroseconds = 100;
36 
ExecuteQueryOnTrace(std::string sql_query,std::string trace,grpc::Status & query_status,std::mutex & worker_lock,std::vector<protos::BigtraceQueryResponse> & response_buffer,std::unique_ptr<protos::BigtraceWorker::Stub> & stub,ThreadWithContext * contextual_thread)37 grpc::Status ExecuteQueryOnTrace(
38     std::string sql_query,
39     std::string trace,
40     grpc::Status& query_status,
41     std::mutex& worker_lock,
42     std::vector<protos::BigtraceQueryResponse>& response_buffer,
43     std::unique_ptr<protos::BigtraceWorker::Stub>& stub,
44     ThreadWithContext* contextual_thread) {
45   protos::BigtraceQueryTraceArgs trace_args;
46   protos::BigtraceQueryTraceResponse trace_response;
47 
48   trace_args.set_sql_query(sql_query);
49   trace_args.set_trace(trace);
50   grpc::Status status = stub->QueryTrace(
51       contextual_thread->client_context.get(), trace_args, &trace_response);
52 
53   if (!status.ok()) {
54     {
55       std::lock_guard<std::mutex> status_guard(worker_lock);
56       // We check and only update the query status if it was not already errored
57       // to avoid unnecessary updates.
58       if (query_status.ok()) {
59         query_status = status;
60       }
61     }
62 
63     return status;
64   }
65 
66   protos::BigtraceQueryResponse response;
67   response.set_trace(trace_response.trace());
68   for (const protos::QueryResult& query_result : trace_response.result()) {
69     response.add_result()->CopyFrom(query_result);
70     if (query_result.has_error()) {
71       // TODO(b/366410502) Add a mode of operation where some traces are allowed
72       // to be dropped and a corresponding message is displayed to the user
73       // alongside partial results
74       std::lock_guard<std::mutex> status_guard(worker_lock);
75       query_status = grpc::Status(grpc::StatusCode::INTERNAL,
76                                   "[" + trace + "]: " + query_result.error());
77       break;
78     }
79   }
80   std::lock_guard<std::mutex> buffer_guard(worker_lock);
81   response_buffer.emplace_back(std::move(response));
82 
83   return grpc::Status::OK;
84 }
85 
ThreadRunLoop(ThreadWithContext * contextual_thread,TraceAddressPool & address_pool,std::string sql_query,grpc::Status & query_status,std::mutex & worker_lock,std::vector<protos::BigtraceQueryResponse> & response_buffer,std::unique_ptr<protos::BigtraceWorker::Stub> & stub)86 void ThreadRunLoop(ThreadWithContext* contextual_thread,
87                    TraceAddressPool& address_pool,
88                    std::string sql_query,
89                    grpc::Status& query_status,
90                    std::mutex& worker_lock,
91                    std::vector<protos::BigtraceQueryResponse>& response_buffer,
92                    std::unique_ptr<protos::BigtraceWorker::Stub>& stub) {
93   for (;;) {
94     auto maybe_trace_address = address_pool.Pop();
95     if (!maybe_trace_address) {
96       return;
97     }
98 
99     // The ordering of this context swap followed by the check on thread
100     // cancellation is essential and should not be changed to avoid a race where
101     // a request to cancel a thread is sent, followed by a context swap, causing
102     // the cancel to not be caught and the execution of the loop body to
103     // continue.
104     contextual_thread->client_context = std::make_unique<grpc::ClientContext>();
105 
106     if (contextual_thread->IsCancelled()) {
107       address_pool.MarkCancelled(std::move(*maybe_trace_address));
108       return;
109     }
110 
111     grpc::Status status = ExecuteQueryOnTrace(
112         sql_query, *maybe_trace_address, query_status, worker_lock,
113         response_buffer, stub, contextual_thread);
114 
115     if (!status.ok()) {
116       if (status.error_code() == grpc::StatusCode::CANCELLED) {
117         address_pool.MarkCancelled(std::move(*maybe_trace_address));
118       }
119       return;
120     }
121   }
122 }
123 
124 }  // namespace
125 
OrchestratorImpl(std::unique_ptr<protos::BigtraceWorker::Stub> stub,uint32_t max_query_concurrency)126 OrchestratorImpl::OrchestratorImpl(
127     std::unique_ptr<protos::BigtraceWorker::Stub> stub,
128     uint32_t max_query_concurrency)
129     : stub_(std::move(stub)), max_query_concurrency_(max_query_concurrency) {}
130 
Query(grpc::ServerContext *,const protos::BigtraceQueryArgs * args,grpc::ServerWriter<protos::BigtraceQueryResponse> * writer)131 grpc::Status OrchestratorImpl::Query(
132     grpc::ServerContext*,
133     const protos::BigtraceQueryArgs* args,
134     grpc::ServerWriter<protos::BigtraceQueryResponse>* writer) {
135   grpc::Status query_status;
136   std::mutex worker_lock;
137   const std::string& sql_query = args->sql_query();
138   std::vector<std::string> traces(args->traces().begin(), args->traces().end());
139 
140   std::vector<protos::BigtraceQueryResponse> response_buffer;
141   uint64_t trace_count = static_cast<uint64_t>(args->traces_size());
142 
143   TraceAddressPool address_pool(std::move(traces));
144 
145   // Update the query count on start and end ensuring that the query count is
146   // always decremented whenever the function is exited.
147   {
148     std::lock_guard<std::mutex> lk(query_count_mutex_);
149     query_count_++;
150   }
151   auto query_count_decrement = base::OnScopeExit([&]() {
152     std::lock_guard<std::mutex> lk(query_count_mutex_);
153     query_count_--;
154   });
155 
156   ResizableTaskPool task_pool([&](ThreadWithContext* new_contextual_thread) {
157     ThreadRunLoop(new_contextual_thread, address_pool, sql_query, query_status,
158                   worker_lock, response_buffer, stub_);
159   });
160 
161   uint64_t pushed_response_count = 0;
162   uint32_t last_query_count = 0;
163   uint32_t current_query_count = 0;
164 
165   for (;;) {
166     {
167       std::lock_guard<std::mutex> lk(query_count_mutex_);
168       current_query_count = query_count_;
169     }
170 
171     PERFETTO_CHECK(current_query_count != 0);
172 
173     // Update the number of threads to the lower of {the remaining number of
174     // traces} and the {maximum concurrency divided by the number of active
175     // queries}. This ensures that at most |max_query_concurrency_| calls to the
176     // backend are outstanding at any one point.
177     if (last_query_count != current_query_count) {
178       auto new_size =
179           std::min(std::max<uint32_t>(address_pool.RemainingCount(), 1u),
180                    max_query_concurrency_ / current_query_count);
181       task_pool.Resize(new_size);
182       last_query_count = current_query_count;
183     }
184 
185     // Exit the loop when either all responses have been successfully completed
186     // or if there is an error.
187     {
188       std::lock_guard<std::mutex> status_guard(worker_lock);
189       if (pushed_response_count == trace_count || !query_status.ok()) {
190         break;
191       }
192     }
193 
194     // A buffer is used to periodically make writes to the client instead of
195     // writing every individual response in order to reduce contention on the
196     // writer.
197     base::SleepMicroseconds(kBufferPushDelayMicroseconds);
198     if (response_buffer.empty()) {
199       continue;
200     }
201     std::vector<protos::BigtraceQueryResponse> buffer;
202     {
203       std::lock_guard<std::mutex> buffer_guard(worker_lock);
204       buffer = std::move(response_buffer);
205       response_buffer.clear();
206     }
207     for (protos::BigtraceQueryResponse& response : buffer) {
208       writer->Write(std::move(response));
209     }
210     pushed_response_count += buffer.size();
211   }
212 
213   task_pool.JoinAll();
214 
215   return query_status;
216 }
217 
218 }  // namespace perfetto::bigtrace
219