1 /*
2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <chrono>
18 #include <memory>
19 #include <mutex>
20 #include <thread>
21
22 #include <grpcpp/client_context.h>
23 #include <grpcpp/support/status.h>
24
25 #include "perfetto/base/logging.h"
26 #include "perfetto/base/time.h"
27 #include "perfetto/ext/base/utils.h"
28 #include "protos/perfetto/bigtrace/orchestrator.pb.h"
29 #include "src/bigtrace/orchestrator/orchestrator_impl.h"
30 #include "src/bigtrace/orchestrator/resizable_task_pool.h"
31 #include "src/bigtrace/orchestrator/trace_address_pool.h"
32
33 namespace perfetto::bigtrace {
34 namespace {
35 const uint32_t kBufferPushDelayMicroseconds = 100;
36
ExecuteQueryOnTrace(std::string sql_query,std::string trace,grpc::Status & query_status,std::mutex & worker_lock,std::vector<protos::BigtraceQueryResponse> & response_buffer,std::unique_ptr<protos::BigtraceWorker::Stub> & stub,ThreadWithContext * contextual_thread)37 grpc::Status ExecuteQueryOnTrace(
38 std::string sql_query,
39 std::string trace,
40 grpc::Status& query_status,
41 std::mutex& worker_lock,
42 std::vector<protos::BigtraceQueryResponse>& response_buffer,
43 std::unique_ptr<protos::BigtraceWorker::Stub>& stub,
44 ThreadWithContext* contextual_thread) {
45 protos::BigtraceQueryTraceArgs trace_args;
46 protos::BigtraceQueryTraceResponse trace_response;
47
48 trace_args.set_sql_query(sql_query);
49 trace_args.set_trace(trace);
50 grpc::Status status = stub->QueryTrace(
51 contextual_thread->client_context.get(), trace_args, &trace_response);
52
53 if (!status.ok()) {
54 {
55 std::lock_guard<std::mutex> status_guard(worker_lock);
56 // We check and only update the query status if it was not already errored
57 // to avoid unnecessary updates.
58 if (query_status.ok()) {
59 query_status = status;
60 }
61 }
62
63 return status;
64 }
65
66 protos::BigtraceQueryResponse response;
67 response.set_trace(trace_response.trace());
68 for (const protos::QueryResult& query_result : trace_response.result()) {
69 response.add_result()->CopyFrom(query_result);
70 if (query_result.has_error()) {
71 // TODO(b/366410502) Add a mode of operation where some traces are allowed
72 // to be dropped and a corresponding message is displayed to the user
73 // alongside partial results
74 std::lock_guard<std::mutex> status_guard(worker_lock);
75 query_status = grpc::Status(grpc::StatusCode::INTERNAL,
76 "[" + trace + "]: " + query_result.error());
77 break;
78 }
79 }
80 std::lock_guard<std::mutex> buffer_guard(worker_lock);
81 response_buffer.emplace_back(std::move(response));
82
83 return grpc::Status::OK;
84 }
85
ThreadRunLoop(ThreadWithContext * contextual_thread,TraceAddressPool & address_pool,std::string sql_query,grpc::Status & query_status,std::mutex & worker_lock,std::vector<protos::BigtraceQueryResponse> & response_buffer,std::unique_ptr<protos::BigtraceWorker::Stub> & stub)86 void ThreadRunLoop(ThreadWithContext* contextual_thread,
87 TraceAddressPool& address_pool,
88 std::string sql_query,
89 grpc::Status& query_status,
90 std::mutex& worker_lock,
91 std::vector<protos::BigtraceQueryResponse>& response_buffer,
92 std::unique_ptr<protos::BigtraceWorker::Stub>& stub) {
93 for (;;) {
94 auto maybe_trace_address = address_pool.Pop();
95 if (!maybe_trace_address) {
96 return;
97 }
98
99 // The ordering of this context swap followed by the check on thread
100 // cancellation is essential and should not be changed to avoid a race where
101 // a request to cancel a thread is sent, followed by a context swap, causing
102 // the cancel to not be caught and the execution of the loop body to
103 // continue.
104 contextual_thread->client_context = std::make_unique<grpc::ClientContext>();
105
106 if (contextual_thread->IsCancelled()) {
107 address_pool.MarkCancelled(std::move(*maybe_trace_address));
108 return;
109 }
110
111 grpc::Status status = ExecuteQueryOnTrace(
112 sql_query, *maybe_trace_address, query_status, worker_lock,
113 response_buffer, stub, contextual_thread);
114
115 if (!status.ok()) {
116 if (status.error_code() == grpc::StatusCode::CANCELLED) {
117 address_pool.MarkCancelled(std::move(*maybe_trace_address));
118 }
119 return;
120 }
121 }
122 }
123
124 } // namespace
125
OrchestratorImpl(std::unique_ptr<protos::BigtraceWorker::Stub> stub,uint32_t max_query_concurrency)126 OrchestratorImpl::OrchestratorImpl(
127 std::unique_ptr<protos::BigtraceWorker::Stub> stub,
128 uint32_t max_query_concurrency)
129 : stub_(std::move(stub)), max_query_concurrency_(max_query_concurrency) {}
130
Query(grpc::ServerContext *,const protos::BigtraceQueryArgs * args,grpc::ServerWriter<protos::BigtraceQueryResponse> * writer)131 grpc::Status OrchestratorImpl::Query(
132 grpc::ServerContext*,
133 const protos::BigtraceQueryArgs* args,
134 grpc::ServerWriter<protos::BigtraceQueryResponse>* writer) {
135 grpc::Status query_status;
136 std::mutex worker_lock;
137 const std::string& sql_query = args->sql_query();
138 std::vector<std::string> traces(args->traces().begin(), args->traces().end());
139
140 std::vector<protos::BigtraceQueryResponse> response_buffer;
141 uint64_t trace_count = static_cast<uint64_t>(args->traces_size());
142
143 TraceAddressPool address_pool(std::move(traces));
144
145 // Update the query count on start and end ensuring that the query count is
146 // always decremented whenever the function is exited.
147 {
148 std::lock_guard<std::mutex> lk(query_count_mutex_);
149 query_count_++;
150 }
151 auto query_count_decrement = base::OnScopeExit([&]() {
152 std::lock_guard<std::mutex> lk(query_count_mutex_);
153 query_count_--;
154 });
155
156 ResizableTaskPool task_pool([&](ThreadWithContext* new_contextual_thread) {
157 ThreadRunLoop(new_contextual_thread, address_pool, sql_query, query_status,
158 worker_lock, response_buffer, stub_);
159 });
160
161 uint64_t pushed_response_count = 0;
162 uint32_t last_query_count = 0;
163 uint32_t current_query_count = 0;
164
165 for (;;) {
166 {
167 std::lock_guard<std::mutex> lk(query_count_mutex_);
168 current_query_count = query_count_;
169 }
170
171 PERFETTO_CHECK(current_query_count != 0);
172
173 // Update the number of threads to the lower of {the remaining number of
174 // traces} and the {maximum concurrency divided by the number of active
175 // queries}. This ensures that at most |max_query_concurrency_| calls to the
176 // backend are outstanding at any one point.
177 if (last_query_count != current_query_count) {
178 auto new_size =
179 std::min(std::max<uint32_t>(address_pool.RemainingCount(), 1u),
180 max_query_concurrency_ / current_query_count);
181 task_pool.Resize(new_size);
182 last_query_count = current_query_count;
183 }
184
185 // Exit the loop when either all responses have been successfully completed
186 // or if there is an error.
187 {
188 std::lock_guard<std::mutex> status_guard(worker_lock);
189 if (pushed_response_count == trace_count || !query_status.ok()) {
190 break;
191 }
192 }
193
194 // A buffer is used to periodically make writes to the client instead of
195 // writing every individual response in order to reduce contention on the
196 // writer.
197 base::SleepMicroseconds(kBufferPushDelayMicroseconds);
198 if (response_buffer.empty()) {
199 continue;
200 }
201 std::vector<protos::BigtraceQueryResponse> buffer;
202 {
203 std::lock_guard<std::mutex> buffer_guard(worker_lock);
204 buffer = std::move(response_buffer);
205 response_buffer.clear();
206 }
207 for (protos::BigtraceQueryResponse& response : buffer) {
208 writer->Write(std::move(response));
209 }
210 pushed_response_count += buffer.size();
211 }
212
213 task_pool.JoinAll();
214
215 return query_status;
216 }
217
218 } // namespace perfetto::bigtrace
219