xref: /aosp_15_r20/external/perfetto/src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/perfetto_sql/intrinsics/functions/type_builders.h"
18 
19 #include <algorithm>
20 #include <cstddef>
21 #include <cstdint>
22 #include <functional>
23 #include <limits>
24 #include <memory>
25 #include <optional>
26 #include <queue>
27 #include <string>
28 #include <utility>
29 #include <variant>
30 #include <vector>
31 
32 #include "perfetto/base/logging.h"
33 #include "perfetto/base/status.h"
34 #include "perfetto/ext/base/flat_hash_map.h"
35 #include "perfetto/ext/base/hash.h"
36 #include "perfetto/ext/base/small_vector.h"
37 #include "perfetto/public/compiler.h"
38 #include "perfetto/trace_processor/basic_types.h"
39 #include "src/trace_processor/containers/interval_intersector.h"
40 #include "src/trace_processor/perfetto_sql/engine/perfetto_sql_engine.h"
41 #include "src/trace_processor/perfetto_sql/intrinsics/types/array.h"
42 #include "src/trace_processor/perfetto_sql/intrinsics/types/counter.h"
43 #include "src/trace_processor/perfetto_sql/intrinsics/types/node.h"
44 #include "src/trace_processor/perfetto_sql/intrinsics/types/partitioned_intervals.h"
45 #include "src/trace_processor/perfetto_sql/intrinsics/types/row_dataframe.h"
46 #include "src/trace_processor/perfetto_sql/intrinsics/types/struct.h"
47 #include "src/trace_processor/sqlite/bindings/sqlite_aggregate_function.h"
48 #include "src/trace_processor/sqlite/bindings/sqlite_function.h"
49 #include "src/trace_processor/sqlite/bindings/sqlite_result.h"
50 #include "src/trace_processor/sqlite/bindings/sqlite_type.h"
51 #include "src/trace_processor/sqlite/bindings/sqlite_value.h"
52 #include "src/trace_processor/sqlite/sqlite_utils.h"
53 #include "src/trace_processor/util/status_macros.h"
54 
55 namespace perfetto::trace_processor {
56 namespace {
57 
HashSqlValue(base::Hasher & h,const SqlValue & v)58 inline void HashSqlValue(base::Hasher& h, const SqlValue& v) {
59   switch (v.type) {
60     case SqlValue::Type::kString:
61       h.Update(v.AsString());
62       break;
63     case SqlValue::Type::kDouble:
64       h.Update(v.AsDouble());
65       break;
66     case SqlValue::Type::kLong:
67       h.Update(v.AsLong());
68       break;
69     case SqlValue::Type::kBytes:
70       PERFETTO_FATAL("Wrong type");
71       break;
72     case SqlValue::Type::kNull:
73       h.Update(nullptr);
74       break;
75   }
76   return;
77 }
78 
79 using Array = std::variant<perfetto_sql::IntArray,
80                            perfetto_sql::DoubleArray,
81                            perfetto_sql::StringArray>;
82 
83 // An SQL aggregate-function which creates an array.
84 struct ArrayAgg : public SqliteAggregateFunction<ArrayAgg> {
85   static constexpr char kName[] = "__intrinsic_array_agg";
86   static constexpr int kArgCount = 1;
87   struct AggCtx : SqliteAggregateContext<AggCtx> {
88     template <typename T>
Pushperfetto::trace_processor::__anon22a301260111::ArrayAgg::AggCtx89     void Push(sqlite3_context* ctx, T value) {
90       if (PERFETTO_UNLIKELY(!array)) {
91         array = std::vector<T>{std::move(value)};
92         return;
93       }
94       auto* a = std::get_if<std::vector<T>>(&*array);
95       if (!a) {
96         return sqlite::result::Error(
97             ctx, "ARRAY_AGG: all values must have the same type");
98       }
99       a->emplace_back(std::move(value));
100     }
101     template <typename T>
Resultperfetto::trace_processor::__anon22a301260111::ArrayAgg::AggCtx102     void Result(sqlite3_context* ctx, const char* type) {
103       auto res = std::make_unique<std::vector<T>>(
104           std::get<std::vector<T>>(std::move(*array)));
105       return sqlite::result::UniquePointer(ctx, std::move(res), type);
106     }
107 
108     std::optional<Array> array;
109   };
110 
Stepperfetto::trace_processor::__anon22a301260111::ArrayAgg111   static void Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
112     PERFETTO_DCHECK(argc == kArgCount);
113 
114     auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
115     switch (sqlite::value::Type(argv[0])) {
116       case sqlite::Type::kInteger:
117         return agg_ctx.Push(ctx, sqlite::value::Int64(argv[0]));
118       case sqlite::Type::kText:
119         return agg_ctx.Push<std::string>(ctx, sqlite::value::Text(argv[0]));
120       case sqlite::Type::kFloat:
121         return agg_ctx.Push(ctx, sqlite::value::Double(argv[0]));
122       case sqlite::Type::kNull:
123         return sqlite::result::Error(
124             ctx,
125             "ARRAY_AGG: nulls are not supported. They should be filtered out "
126             "before calling ARRAY_AGG.");
127       case sqlite::Type::kBlob:
128         return sqlite::result::Error(ctx,
129                                      "ARRAY_AGG: blobs are not supported.");
130     }
131   }
Finalperfetto::trace_processor::__anon22a301260111::ArrayAgg132   static void Final(sqlite3_context* ctx) {
133     auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
134     if (!raw_agg_ctx) {
135       return sqlite::result::Null(ctx);
136     }
137 
138     auto& array = *raw_agg_ctx.get()->array;
139     switch (array.index()) {
140       case 0 /* int64_t */:
141         return raw_agg_ctx.get()->Result<int64_t>(ctx, "ARRAY<LONG>");
142       case 1 /* double */:
143         return raw_agg_ctx.get()->Result<double>(ctx, "ARRAY<DOUBLE>");
144       case 2 /* std::string */:
145         return raw_agg_ctx.get()->Result<std::string>(ctx, "ARRAY<STRING>");
146       default:
147         PERFETTO_FATAL("%zu is not a valid index", array.index());
148     }
149   }
150 };
151 
152 // An SQL aggregate function which creates a graph.
153 struct NodeAgg : public SqliteAggregateFunction<NodeAgg> {
154   static constexpr char kName[] = "__intrinsic_graph_agg";
155   static constexpr int kArgCount = 2;
156   struct AggCtx : SqliteAggregateContext<AggCtx> {
157     perfetto_sql::Graph graph;
158   };
159 
Stepperfetto::trace_processor::__anon22a301260111::NodeAgg160   static void Step(sqlite3_context* ctx, int argc, sqlite3_value** argv) {
161     PERFETTO_DCHECK(argc == kArgCount);
162 
163     auto source_id = static_cast<uint32_t>(sqlite::value::Int64(argv[0]));
164     auto target_id = static_cast<uint32_t>(sqlite::value::Int64(argv[1]));
165     uint32_t max_id = std::max(source_id, target_id);
166     auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
167     if (max_id >= agg_ctx.graph.size()) {
168       agg_ctx.graph.resize(max_id + 1);
169     }
170     agg_ctx.graph[source_id].outgoing_edges.push_back(target_id);
171   }
Finalperfetto::trace_processor::__anon22a301260111::NodeAgg172   static void Final(sqlite3_context* ctx) {
173     auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
174     if (!raw_agg_ctx.get()) {
175       return;
176     }
177     auto nodes = std::make_unique<perfetto_sql::Graph>(
178         std::move(raw_agg_ctx.get()->graph));
179     return sqlite::result::UniquePointer(ctx, std::move(nodes), "GRAPH");
180   }
181 };
182 
183 // An SQL scalar function which creates an struct.
184 struct Struct : public SqliteFunction<Struct> {
185   static constexpr char kName[] = "__intrinsic_struct";
186   static constexpr int kArgCount = -1;
187 
Stepperfetto::trace_processor::__anon22a301260111::Struct188   static void Step(sqlite3_context* ctx, int rargc, sqlite3_value** argv) {
189     auto argc = static_cast<uint32_t>(rargc);
190     if (argc % 2 != 0) {
191       return sqlite::result::Error(
192           ctx, "STRUCT: must have an even number of arguments");
193     }
194     if (argc / 2 > perfetto_sql::Struct::kMaxFields) {
195       return sqlite::utils::SetError(
196           ctx, base::ErrStatus("STRUCT: only at most %d fields are supported",
197                                perfetto_sql::Struct::kMaxFields));
198     }
199 
200     auto s = std::make_unique<perfetto_sql::Struct>();
201     s->field_count = argc / 2;
202     for (uint32_t i = 0; i < s->field_count; ++i) {
203       if (sqlite::value::Type(argv[i]) != sqlite::Type::kText) {
204         return sqlite::result::Error(ctx,
205                                      "STRUCT: field names must be strings");
206       }
207       auto& field = s->fields[i];
208       field.first = sqlite::value::Text(argv[i]);
209       switch (sqlite::value::Type(argv[s->field_count + i])) {
210         case sqlite::Type::kText:
211           field.second = sqlite::value::Text(argv[s->field_count + i]);
212           break;
213         case sqlite::Type::kInteger:
214           field.second = sqlite::value::Int64(argv[s->field_count + i]);
215           break;
216         case sqlite::Type::kFloat:
217           field.second = sqlite::value::Double(argv[s->field_count + i]);
218           break;
219         case sqlite::Type::kNull:
220           field.second = std::monostate();
221           break;
222         case sqlite::Type::kBlob:
223           return sqlite::result::Error(ctx,
224                                        "STRUCT: blob fields not supported");
225       }
226     }
227     return sqlite::result::UniquePointer(ctx, std::move(s), "STRUCT");
228   }
229 };
230 
231 // An SQL aggregate function which creates a RowDataframe.
232 struct RowDataframeAgg : public SqliteAggregateFunction<Struct> {
233   static constexpr char kName[] = "__intrinsic_row_dataframe_agg";
234   static constexpr int kArgCount = -1;
235   struct AggCtx : SqliteAggregateContext<AggCtx> {
236     perfetto_sql::RowDataframe dataframe;
237     std::optional<uint32_t> argc_index;
238   };
239 
Stepperfetto::trace_processor::__anon22a301260111::RowDataframeAgg240   static void Step(sqlite3_context* ctx, int rargc, sqlite3_value** argv) {
241     auto argc = static_cast<uint32_t>(rargc);
242     if (argc % 2 != 0) {
243       return sqlite::result::Error(
244           ctx, "ROW_DATAFRAME_AGG: must have an even number of arguments");
245     }
246 
247     auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
248     auto& df = agg_ctx.dataframe;
249     if (df.column_names.empty()) {
250       for (uint32_t i = 0; i < argc; i += 2) {
251         df.column_names.emplace_back(sqlite::value::Text(argv[i]));
252         if (df.column_names.back() == "id") {
253           df.id_column_index = i / 2;
254           agg_ctx.argc_index = i + 1;
255         }
256       }
257     }
258 
259     if (agg_ctx.argc_index) {
260       auto id = static_cast<uint32_t>(
261           sqlite::value::Int64(argv[*agg_ctx.argc_index]));
262       if (id >= df.id_to_cell_index.size()) {
263         df.id_to_cell_index.resize(id + 1,
264                                    std::numeric_limits<uint32_t>::max());
265       }
266       df.id_to_cell_index[id] = static_cast<uint32_t>(df.cells.size());
267     }
268 
269     for (uint32_t i = 1; i < argc; i += 2) {
270       switch (sqlite::value::Type(argv[i])) {
271         case sqlite::Type::kText:
272           df.cells.emplace_back(sqlite::value::Text(argv[i]));
273           break;
274         case sqlite::Type::kInteger:
275           df.cells.emplace_back(sqlite::value::Int64(argv[i]));
276           break;
277         case sqlite::Type::kFloat:
278           df.cells.emplace_back(sqlite::value::Double(argv[i]));
279           break;
280         case sqlite::Type::kNull:
281           df.cells.emplace_back(std::monostate());
282           break;
283         case sqlite::Type::kBlob:
284           return sqlite::result::Error(
285               ctx, "ROW_DATAFRAME_AGG: blob fields not supported");
286       }
287     }
288   }
289 
Finalperfetto::trace_processor::__anon22a301260111::RowDataframeAgg290   static void Final(sqlite3_context* ctx) {
291     auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
292     if (!raw_agg_ctx) {
293       return sqlite::result::Null(ctx);
294     }
295     return sqlite::result::UniquePointer(
296         ctx,
297         std::make_unique<perfetto_sql::RowDataframe>(
298             std::move(raw_agg_ctx.get()->dataframe)),
299         "ROW_DATAFRAME");
300   }
301 };
302 
303 struct IntervalTreeIntervalsAgg
304     : public SqliteAggregateFunction<perfetto_sql::PartitionedTable> {
305   static constexpr char kName[] = "__intrinsic_interval_tree_intervals_agg";
306   static constexpr int kArgCount = -1;
307   static constexpr int kMinArgCount = 3;
308   struct AggCtx : SqliteAggregateContext<AggCtx> {
309     perfetto_sql::PartitionedTable partitions;
310     std::vector<SqlValue> tmp_vals;
311     uint64_t last_interval_start = 0;
312   };
313 
Stepperfetto::trace_processor::__anon22a301260111::IntervalTreeIntervalsAgg314   static void Step(sqlite3_context* ctx, int rargc, sqlite3_value** argv) {
315     auto argc = static_cast<uint32_t>(rargc);
316     PERFETTO_DCHECK(argc >= kMinArgCount);
317     auto& agg_ctx = AggCtx::GetOrCreateContextForStep(ctx);
318 
319     // Fetch and validate the interval.
320     Interval interval;
321     interval.id = static_cast<uint32_t>(sqlite::value::Int64(argv[0]));
322     interval.start = static_cast<uint64_t>(sqlite::value::Int64(argv[1]));
323     if (interval.start < agg_ctx.last_interval_start) {
324       if (sqlite::value::Int64(argv[1]) < 0) {
325         sqlite::result::Error(
326             ctx, "Interval intersect only accepts positive `ts` values.");
327         return;
328       }
329       sqlite::result::Error(
330           ctx, "Interval intersect requires intervals to be sorted by ts.");
331       return;
332     }
333     agg_ctx.last_interval_start = interval.start;
334     int64_t dur = sqlite::value::Int64(argv[2]);
335     if (dur < 1) {
336       sqlite::result::Error(
337           ctx, "Interval intersect only works on intervals with dur > 0");
338       return;
339     }
340     interval.end = interval.start + static_cast<uint64_t>(dur);
341 
342     // Fast path for no partitions.
343     auto& parts = agg_ctx.partitions;
344     if (argc == kMinArgCount) {
345       auto& part = parts.partitions_map[0];
346       part.intervals.push_back(interval);
347       if (part.is_nonoverlapping) {
348         if (interval.start < part.last_interval) {
349           part.is_nonoverlapping = false;
350         } else {
351           part.last_interval = interval.end;
352         }
353       }
354       return;
355     }
356 
357     // On the first |Step()| we need to fetch the names of the partitioned
358     // columns.
359     if (parts.partition_column_names.empty()) {
360       for (uint32_t i = 3; i < argc; i += 2) {
361         parts.partition_column_names.push_back(
362             sqlite::utils::SqliteValueToSqlValue(argv[i]).AsString());
363       }
364       agg_ctx.tmp_vals.resize(parts.partition_column_names.size());
365     }
366 
367     // Create a partition key and save SqlValues of the partition.
368     base::Hasher h;
369     uint32_t j = 0;
370     for (uint32_t i = kMinArgCount + 1; i < argc; i += 2) {
371       SqlValue new_val = sqlite::utils::SqliteValueToSqlValue(argv[i]);
372       agg_ctx.tmp_vals[j] = new_val;
373       HashSqlValue(h, new_val);
374       j++;
375     }
376 
377     uint64_t key = h.digest();
378     auto* part = parts.partitions_map.Find(key);
379 
380     // If we encountered this partition before we only have to push the interval
381     // into it.
382     if (part) {
383       part->intervals.push_back(interval);
384       if (part->is_nonoverlapping) {
385         if (interval.start < part->last_interval) {
386           part->is_nonoverlapping = false;
387         } else {
388           part->last_interval = interval.end;
389         }
390       }
391       return;
392     }
393 
394     std::vector<SqlValue> part_values;
395     for (uint32_t i = kMinArgCount + 1; i < argc; i += 2) {
396       part_values.push_back(sqlite::utils::SqliteValueToSqlValue(argv[i]));
397     }
398     perfetto_sql::Partition new_partition;
399     new_partition.sql_values = agg_ctx.tmp_vals;
400     new_partition.last_interval = interval.end;
401     new_partition.intervals = {interval};
402 
403     parts.partitions_map[key] = std::move(new_partition);
404   }
405 
Finalperfetto::trace_processor::__anon22a301260111::IntervalTreeIntervalsAgg406   static void Final(sqlite3_context* ctx) {
407     auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
408     if (!raw_agg_ctx) {
409       return sqlite::result::Null(ctx);
410     }
411     return sqlite::result::UniquePointer(
412         ctx,
413         std::make_unique<perfetto_sql::PartitionedTable>(
414             std::move(raw_agg_ctx.get()->partitions)),
415         perfetto_sql::PartitionedTable::kName);
416   }
417 };
418 
419 struct CounterPerTrackAgg
420     : public SqliteAggregateFunction<perfetto_sql::PartitionedCounter> {
421   static constexpr char kName[] = "__intrinsic_counter_per_track_agg";
422   static constexpr int kArgCount = 4;
423   struct AggCtx : SqliteAggregateContext<AggCtx> {
424     perfetto_sql::PartitionedCounter tracks;
425   };
426 
Stepperfetto::trace_processor::__anon22a301260111::CounterPerTrackAgg427   static void Step(sqlite3_context* ctx, int rargc, sqlite3_value** argv) {
428     auto argc = static_cast<uint32_t>(rargc);
429     PERFETTO_DCHECK(argc == kArgCount);
430     auto& tracks = AggCtx::GetOrCreateContextForStep(ctx).tracks;
431 
432     // Fetch columns.
433     int64_t id = sqlite::value::Int64(argv[0]);
434     int64_t ts = sqlite::value::Int64(argv[1]);
435     int64_t track_id = static_cast<uint32_t>(sqlite::value::Int64(argv[2]));
436     double val = sqlite::value::Double(argv[3]);
437 
438     auto* new_rows_track = tracks.partitions_map.Find(track_id);
439     if (!new_rows_track) {
440       new_rows_track = tracks.partitions_map.Insert(track_id, {}).first;
441     } else if (std::equal_to<double>()(new_rows_track->val.back(), val)) {
442       // TODO(mayzner): This algorithm is focused on "leading" counters - if the
443       // counter before had the same value we can safely remove the new one as
444       // it adds no value. In the future we should also support "lagging" - if
445       // the next one has the same value as the previous, we should remove the
446       // previous.
447       return;
448     }
449 
450     new_rows_track->id.push_back(id);
451     new_rows_track->ts.push_back(ts);
452     new_rows_track->val.push_back(val);
453   }
454 
Finalperfetto::trace_processor::__anon22a301260111::CounterPerTrackAgg455   static void Final(sqlite3_context* ctx) {
456     auto raw_agg_ctx = AggCtx::GetContextOrNullForFinal(ctx);
457     if (!raw_agg_ctx) {
458       return sqlite::result::Null(ctx);
459     }
460     return sqlite::result::UniquePointer(
461         ctx,
462         std::make_unique<perfetto_sql::PartitionedCounter>(
463             std::move(raw_agg_ctx.get()->tracks)),
464         perfetto_sql::PartitionedCounter::kName);
465   }
466 };
467 
468 }  // namespace
469 
RegisterTypeBuilderFunctions(PerfettoSqlEngine & engine)470 base::Status RegisterTypeBuilderFunctions(PerfettoSqlEngine& engine) {
471   RETURN_IF_ERROR(engine.RegisterSqliteAggregateFunction<ArrayAgg>(nullptr));
472   RETURN_IF_ERROR(engine.RegisterSqliteFunction<Struct>(nullptr));
473   RETURN_IF_ERROR(
474       engine.RegisterSqliteAggregateFunction<RowDataframeAgg>(nullptr));
475   RETURN_IF_ERROR(
476       engine.RegisterSqliteAggregateFunction<IntervalTreeIntervalsAgg>(
477           nullptr));
478   RETURN_IF_ERROR(
479       engine.RegisterSqliteAggregateFunction<CounterPerTrackAgg>(nullptr));
480   return engine.RegisterSqliteAggregateFunction<NodeAgg>(nullptr);
481 }
482 
483 }  // namespace perfetto::trace_processor
484