1 /*
2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.h"
18
19 #include <algorithm>
20 #include <cstddef>
21 #include <cstdint>
22 #include <functional>
23 #include <limits>
24 #include <memory>
25 #include <optional>
26 #include <queue>
27 #include <string>
28 #include <utility>
29 #include <variant>
30 #include <vector>
31
32 #include "perfetto/base/logging.h"
33 #include "perfetto/base/status.h"
34 #include "perfetto/ext/base/flat_hash_map.h"
35 #include "perfetto/ext/base/hash.h"
36 #include "perfetto/ext/base/small_vector.h"
37 #include "perfetto/public/compiler.h"
38 #include "perfetto/trace_processor/basic_types.h"
39 #include "src/trace_processor/containers/interval_intersector.h"
40 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
41 #include "src/trace_processor/perfetto_sql/intrinsics/types/array.h"
42 #include "src/trace_processor/perfetto_sql/intrinsics/types/counter.h"
43 #include "src/trace_processor/perfetto_sql/intrinsics/types/node.h"
44 #include "src/trace_processor/perfetto_sql/intrinsics/types/partitioned_intervals.h"
45 #include "src/trace_processor/perfetto_sql/intrinsics/types/row_dataframe.h"
46 #include "src/trace_processor/perfetto_sql/intrinsics/types/struct.h"
47 #include "src/trace_processor/sqlite/bindings/sqlite_aggregate_function.h"
48 #include "src/trace_processor/sqlite/bindings/sqlite_function.h"
49 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
50 #include "src/trace_processor/sqlite/bindings/sqlite_type.h"
51 #include "src/trace_processor/sqlite/bindings/sqlite_value.h"
52 #include "src/trace_processor/sqlite/sqlite_utils.h"
53 #include "src/trace_processor/util/status_macros.h"
54
55 namespace perfetto::trace_processor {
56 namespace {
57
HashSqlValue(base::Hasher & h,const SqlValue & v)58 inline void HashSqlValue(base::Hasher& h, const SqlValue& v) {
59 switch (v.type) {
60 case SqlValue::Type::kString:
61 h.Update(v.AsString());
62 break;
63 case SqlValue::Type::kDouble:
64 h.Update(v.AsDouble());
65 break;
66 case SqlValue::Type::kLong:
67 h.Update(v.AsLong());
68 break;
69 case SqlValue::Type::kBytes:
70 PERFETTO_FATAL("Wrong type");
71 break;
72 case SqlValue::Type::kNull:
73 h.Update(nullptr);
74 break;
75 }
76 return;
77 }
78
79 using Array = std::variant<perfetto_sql::IntArray,
80 perfetto_sql::DoubleArray,
81 perfetto_sql::StringArray>;
82
83 // An SQL aggregate-function which creates an array.
84 struct ArrayAgg : public SqliteAggregateFunction<ArrayAgg> {
85 static constexpr char kName[] = "__intrinsic_array_agg";
86 static constexpr int kArgCount = 1;
87 struct AggCtx : SqliteAggregateContext<AggCtx> {
88 template <typename T>
Pushperfetto::trace_processor::__anon22a301260111::ArrayAgg::AggCtx89 void Push(sqlite3_context* ctx, T value) {
90 if (PERFETTO_UNLIKELY(!array)) {
91 array = std::vector<T>{std::move(value)};
92 return;
93 }
94 auto* a = std::get_if<std::vector<T>>(&*array);
95 if (!a) {
96 return sqlite::result::Error(
97 ctx, "ARRAY_AGG: all values must have the same type");
98 }
99 a->emplace_back(std::move(value));
100 }
101 template <typename T>
Resultperfetto::trace_processor::__anon22a301260111::ArrayAgg::AggCtx102 void Result(sqlite3_context* ctx, const char* type) {
103 auto res = std::make_unique<std::vector<T>>(
104 std::get<std::vector<T>>(std::move(*array)));
105 return sqlite::result::UniquePointer(ctx, std::move(res), type);
106 }
107
108 std::optional<Array> array;
109 };
110
Stepperfetto::trace_processor::__anon22a301260111::ArrayAgg111 static void Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
112 PERFETTO_DCHECK(argc == kArgCount);
113
114 auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
115 switch (sqlite::value::Type(argv[0])) {
116 case sqlite::Type::kInteger:
117 return agg_ctx.Push(ctx, sqlite::value::Int64(argv[0]));
118 case sqlite::Type::kText:
119 return agg_ctx.Push<std::string>(ctx, sqlite::value::Text(argv[0]));
120 case sqlite::Type::kFloat:
121 return agg_ctx.Push(ctx, sqlite::value::Double(argv[0]));
122 case sqlite::Type::kNull:
123 return sqlite::result::Error(
124 ctx,
125 "ARRAY_AGG: nulls are not supported. They should be filtered out "
126 "before calling ARRAY_AGG.");
127 case sqlite::Type::kBlob:
128 return sqlite::result::Error(ctx,
129 "ARRAY_AGG: blobs are not supported.");
130 }
131 }
Finalperfetto::trace_processor::__anon22a301260111::ArrayAgg132 static void Final(sqlite3_context* ctx) {
133 auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
134 if (!raw_agg_ctx) {
135 return sqlite::result::Null(ctx);
136 }
137
138 auto& array = *raw_agg_ctx.get()->array;
139 switch (array.index()) {
140 case 0 /* int64_t */:
141 return raw_agg_ctx.get()->Result<int64_t>(ctx, "ARRAY<LONG>");
142 case 1 /* double */:
143 return raw_agg_ctx.get()->Result<double>(ctx, "ARRAY<DOUBLE>");
144 case 2 /* std::string */:
145 return raw_agg_ctx.get()->Result<std::string>(ctx, "ARRAY<STRING>");
146 default:
147 PERFETTO_FATAL("%zu is not a valid index", array.index());
148 }
149 }
150 };
151
152 // An SQL aggregate function which creates a graph.
153 struct NodeAgg : public SqliteAggregateFunction<NodeAgg> {
154 static constexpr char kName[] = "__intrinsic_graph_agg";
155 static constexpr int kArgCount = 2;
156 struct AggCtx : SqliteAggregateContext<AggCtx> {
157 perfetto_sql::Graph graph;
158 };
159
Stepperfetto::trace_processor::__anon22a301260111::NodeAgg160 static void Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
161 PERFETTO_DCHECK(argc == kArgCount);
162
163 auto source_id = static_cast<uint32_t>(sqlite::value::Int64(argv[0]));
164 auto target_id = static_cast<uint32_t>(sqlite::value::Int64(argv[1]));
165 uint32_t max_id = std::max(source_id, target_id);
166 auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
167 if (max_id >= agg_ctx.graph.size()) {
168 agg_ctx.graph.resize(max_id + 1);
169 }
170 agg_ctx.graph[source_id].outgoing_edges.push_back(target_id);
171 }
Finalperfetto::trace_processor::__anon22a301260111::NodeAgg172 static void Final(sqlite3_context* ctx) {
173 auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
174 if (!raw_agg_ctx.get()) {
175 return;
176 }
177 auto nodes = std::make_unique<perfetto_sql::Graph>(
178 std::move(raw_agg_ctx.get()->graph));
179 return sqlite::result::UniquePointer(ctx, std::move(nodes), "GRAPH");
180 }
181 };
182
183 // An SQL scalar function which creates an struct.
184 struct Struct : public SqliteFunction<Struct> {
185 static constexpr char kName[] = "__intrinsic_struct";
186 static constexpr int kArgCount = -1;
187
Stepperfetto::trace_processor::__anon22a301260111::Struct188 static void Step(sqlite3_context* ctx, int rargc, sqlite3_value** argv) {
189 auto argc = static_cast<uint32_t>(rargc);
190 if (argc % 2 != 0) {
191 return sqlite::result::Error(
192 ctx, "STRUCT: must have an even number of arguments");
193 }
194 if (argc / 2 > perfetto_sql::Struct::kMaxFields) {
195 return sqlite::utils::SetError(
196 ctx, base::ErrStatus("STRUCT: only at most %d fields are supported",
197 perfetto_sql::Struct::kMaxFields));
198 }
199
200 auto s = std::make_unique<perfetto_sql::Struct>();
201 s->field_count = argc / 2;
202 for (uint32_t i = 0; i < s->field_count; ++i) {
203 if (sqlite::value::Type(argv[i]) != sqlite::Type::kText) {
204 return sqlite::result::Error(ctx,
205 "STRUCT: field names must be strings");
206 }
207 auto& field = s->fields[i];
208 field.first = sqlite::value::Text(argv[i]);
209 switch (sqlite::value::Type(argv[s->field_count + i])) {
210 case sqlite::Type::kText:
211 field.second = sqlite::value::Text(argv[s->field_count + i]);
212 break;
213 case sqlite::Type::kInteger:
214 field.second = sqlite::value::Int64(argv[s->field_count + i]);
215 break;
216 case sqlite::Type::kFloat:
217 field.second = sqlite::value::Double(argv[s->field_count + i]);
218 break;
219 case sqlite::Type::kNull:
220 field.second = std::monostate();
221 break;
222 case sqlite::Type::kBlob:
223 return sqlite::result::Error(ctx,
224 "STRUCT: blob fields not supported");
225 }
226 }
227 return sqlite::result::UniquePointer(ctx, std::move(s), "STRUCT");
228 }
229 };
230
231 // An SQL aggregate function which creates a RowDataframe.
232 struct RowDataframeAgg : public SqliteAggregateFunction<Struct> {
233 static constexpr char kName[] = "__intrinsic_row_dataframe_agg";
234 static constexpr int kArgCount = -1;
235 struct AggCtx : SqliteAggregateContext<AggCtx> {
236 perfetto_sql::RowDataframe dataframe;
237 std::optional<uint32_t> argc_index;
238 };
239
Stepperfetto::trace_processor::__anon22a301260111::RowDataframeAgg240 static void Step(sqlite3_context* ctx, int rargc, sqlite3_value** argv) {
241 auto argc = static_cast<uint32_t>(rargc);
242 if (argc % 2 != 0) {
243 return sqlite::result::Error(
244 ctx, "ROW_DATAFRAME_AGG: must have an even number of arguments");
245 }
246
247 auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
248 auto& df = agg_ctx.dataframe;
249 if (df.column_names.empty()) {
250 for (uint32_t i = 0; i < argc; i += 2) {
251 df.column_names.emplace_back(sqlite::value::Text(argv[i]));
252 if (df.column_names.back() == "id") {
253 df.id_column_index = i / 2;
254 agg_ctx.argc_index = i + 1;
255 }
256 }
257 }
258
259 if (agg_ctx.argc_index) {
260 auto id = static_cast<uint32_t>(
261 sqlite::value::Int64(argv[*agg_ctx.argc_index]));
262 if (id >= df.id_to_cell_index.size()) {
263 df.id_to_cell_index.resize(id + 1,
264 std::numeric_limits<uint32_t>::max());
265 }
266 df.id_to_cell_index[id] = static_cast<uint32_t>(df.cells.size());
267 }
268
269 for (uint32_t i = 1; i < argc; i += 2) {
270 switch (sqlite::value::Type(argv[i])) {
271 case sqlite::Type::kText:
272 df.cells.emplace_back(sqlite::value::Text(argv[i]));
273 break;
274 case sqlite::Type::kInteger:
275 df.cells.emplace_back(sqlite::value::Int64(argv[i]));
276 break;
277 case sqlite::Type::kFloat:
278 df.cells.emplace_back(sqlite::value::Double(argv[i]));
279 break;
280 case sqlite::Type::kNull:
281 df.cells.emplace_back(std::monostate());
282 break;
283 case sqlite::Type::kBlob:
284 return sqlite::result::Error(
285 ctx, "ROW_DATAFRAME_AGG: blob fields not supported");
286 }
287 }
288 }
289
Finalperfetto::trace_processor::__anon22a301260111::RowDataframeAgg290 static void Final(sqlite3_context* ctx) {
291 auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
292 if (!raw_agg_ctx) {
293 return sqlite::result::Null(ctx);
294 }
295 return sqlite::result::UniquePointer(
296 ctx,
297 std::make_unique<perfetto_sql::RowDataframe>(
298 std::move(raw_agg_ctx.get()->dataframe)),
299 "ROW_DATAFRAME");
300 }
301 };
302
303 struct IntervalTreeIntervalsAgg
304 : public SqliteAggregateFunction<perfetto_sql::PartitionedTable> {
305 static constexpr char kName[] = "__intrinsic_interval_tree_intervals_agg";
306 static constexpr int kArgCount = -1;
307 static constexpr int kMinArgCount = 3;
308 struct AggCtx : SqliteAggregateContext<AggCtx> {
309 perfetto_sql::PartitionedTable partitions;
310 std::vector<SqlValue> tmp_vals;
311 uint64_t last_interval_start = 0;
312 };
313
Stepperfetto::trace_processor::__anon22a301260111::IntervalTreeIntervalsAgg314 static void Step(sqlite3_context* ctx, int rargc, sqlite3_value** argv) {
315 auto argc = static_cast<uint32_t>(rargc);
316 PERFETTO_DCHECK(argc >= kMinArgCount);
317 auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
318
319 // Fetch and validate the interval.
320 Interval interval;
321 interval.id = static_cast<uint32_t>(sqlite::value::Int64(argv[0]));
322 interval.start = static_cast<uint64_t>(sqlite::value::Int64(argv[1]));
323 if (interval.start < agg_ctx.last_interval_start) {
324 if (sqlite::value::Int64(argv[1]) < 0) {
325 sqlite::result::Error(
326 ctx, "Interval intersect only accepts positive `ts` values.");
327 return;
328 }
329 sqlite::result::Error(
330 ctx, "Interval intersect requires intervals to be sorted by ts.");
331 return;
332 }
333 agg_ctx.last_interval_start = interval.start;
334 int64_t dur = sqlite::value::Int64(argv[2]);
335 if (dur < 1) {
336 sqlite::result::Error(
337 ctx, "Interval intersect only works on intervals with dur > 0");
338 return;
339 }
340 interval.end = interval.start + static_cast<uint64_t>(dur);
341
342 // Fast path for no partitions.
343 auto& parts = agg_ctx.partitions;
344 if (argc == kMinArgCount) {
345 auto& part = parts.partitions_map[0];
346 part.intervals.push_back(interval);
347 if (part.is_nonoverlapping) {
348 if (interval.start < part.last_interval) {
349 part.is_nonoverlapping = false;
350 } else {
351 part.last_interval = interval.end;
352 }
353 }
354 return;
355 }
356
357 // On the first |Step()| we need to fetch the names of the partitioned
358 // columns.
359 if (parts.partition_column_names.empty()) {
360 for (uint32_t i = 3; i < argc; i += 2) {
361 parts.partition_column_names.push_back(
362 sqlite::utils::SqliteValueToSqlValue(argv[i]).AsString());
363 }
364 agg_ctx.tmp_vals.resize(parts.partition_column_names.size());
365 }
366
367 // Create a partition key and save SqlValues of the partition.
368 base::Hasher h;
369 uint32_t j = 0;
370 for (uint32_t i = kMinArgCount + 1; i < argc; i += 2) {
371 SqlValue new_val = sqlite::utils::SqliteValueToSqlValue(argv[i]);
372 agg_ctx.tmp_vals[j] = new_val;
373 HashSqlValue(h, new_val);
374 j++;
375 }
376
377 uint64_t key = h.digest();
378 auto* part = parts.partitions_map.Find(key);
379
380 // If we encountered this partition before we only have to push the interval
381 // into it.
382 if (part) {
383 part->intervals.push_back(interval);
384 if (part->is_nonoverlapping) {
385 if (interval.start < part->last_interval) {
386 part->is_nonoverlapping = false;
387 } else {
388 part->last_interval = interval.end;
389 }
390 }
391 return;
392 }
393
394 std::vector<SqlValue> part_values;
395 for (uint32_t i = kMinArgCount + 1; i < argc; i += 2) {
396 part_values.push_back(sqlite::utils::SqliteValueToSqlValue(argv[i]));
397 }
398 perfetto_sql::Partition new_partition;
399 new_partition.sql_values = agg_ctx.tmp_vals;
400 new_partition.last_interval = interval.end;
401 new_partition.intervals = {interval};
402
403 parts.partitions_map[key] = std::move(new_partition);
404 }
405
Finalperfetto::trace_processor::__anon22a301260111::IntervalTreeIntervalsAgg406 static void Final(sqlite3_context* ctx) {
407 auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
408 if (!raw_agg_ctx) {
409 return sqlite::result::Null(ctx);
410 }
411 return sqlite::result::UniquePointer(
412 ctx,
413 std::make_unique<perfetto_sql::PartitionedTable>(
414 std::move(raw_agg_ctx.get()->partitions)),
415 perfetto_sql::PartitionedTable::kName);
416 }
417 };
418
419 struct CounterPerTrackAgg
420 : public SqliteAggregateFunction<perfetto_sql::PartitionedCounter> {
421 static constexpr char kName[] = "__intrinsic_counter_per_track_agg";
422 static constexpr int kArgCount = 4;
423 struct AggCtx : SqliteAggregateContext<AggCtx> {
424 perfetto_sql::PartitionedCounter tracks;
425 };
426
Stepperfetto::trace_processor::__anon22a301260111::CounterPerTrackAgg427 static void Step(sqlite3_context* ctx, int rargc, sqlite3_value** argv) {
428 auto argc = static_cast<uint32_t>(rargc);
429 PERFETTO_DCHECK(argc == kArgCount);
430 auto& tracks = AggCtx::GetOrCreateContextForStep(ctx).tracks;
431
432 // Fetch columns.
433 int64_t id = sqlite::value::Int64(argv[0]);
434 int64_t ts = sqlite::value::Int64(argv[1]);
435 int64_t track_id = static_cast<uint32_t>(sqlite::value::Int64(argv[2]));
436 double val = sqlite::value::Double(argv[3]);
437
438 auto* new_rows_track = tracks.partitions_map.Find(track_id);
439 if (!new_rows_track) {
440 new_rows_track = tracks.partitions_map.Insert(track_id, {}).first;
441 } else if (std::equal_to<double>()(new_rows_track->val.back(), val)) {
442 // TODO(mayzner): This algorithm is focused on "leading" counters - if the
443 // counter before had the same value we can safely remove the new one as
444 // it adds no value. In the future we should also support "lagging" - if
445 // the next one has the same value as the previous, we should remove the
446 // previous.
447 return;
448 }
449
450 new_rows_track->id.push_back(id);
451 new_rows_track->ts.push_back(ts);
452 new_rows_track->val.push_back(val);
453 }
454
Finalperfetto::trace_processor::__anon22a301260111::CounterPerTrackAgg455 static void Final(sqlite3_context* ctx) {
456 auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
457 if (!raw_agg_ctx) {
458 return sqlite::result::Null(ctx);
459 }
460 return sqlite::result::UniquePointer(
461 ctx,
462 std::make_unique<perfetto_sql::PartitionedCounter>(
463 std::move(raw_agg_ctx.get()->tracks)),
464 perfetto_sql::PartitionedCounter::kName);
465 }
466 };
467
468 } // namespace
469
RegisterTypeBuilderFunctions(PerfettoSqlEngine & engine)470 base::Status RegisterTypeBuilderFunctions(PerfettoSqlEngine& engine) {
471 RETURN_IF_ERROR(engine.RegisterSqliteAggregateFunction<ArrayAgg>(nullptr));
472 RETURN_IF_ERROR(engine.RegisterSqliteFunction<Struct>(nullptr));
473 RETURN_IF_ERROR(
474 engine.RegisterSqliteAggregateFunction<RowDataframeAgg>(nullptr));
475 RETURN_IF_ERROR(
476 engine.RegisterSqliteAggregateFunction<IntervalTreeIntervalsAgg>(
477 nullptr));
478 RETURN_IF_ERROR(
479 engine.RegisterSqliteAggregateFunction<CounterPerTrackAgg>(nullptr));
480 return engine.RegisterSqliteAggregateFunction<NodeAgg>(nullptr);
481 }
482
483 } // namespace perfetto::trace_processor
484