xref: /aosp_15_r20/external/perfetto/src/trace_processor/importers/proto/proto_trace_reader.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/importers/proto/proto_trace_reader.h"
18 
19 #include <algorithm>
20 #include <cinttypes>
21 #include <cstddef>
22 #include <cstdint>
23 #include <map>
24 #include <numeric>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <utility>
29 #include <vector>
30 
31 #include "perfetto/base/logging.h"
32 #include "perfetto/base/status.h"
33 #include "perfetto/ext/base/flat_hash_map.h"
34 #include "perfetto/ext/base/status_or.h"
35 #include "perfetto/ext/base/string_view.h"
36 #include "perfetto/protozero/field.h"
37 #include "perfetto/protozero/proto_decoder.h"
38 #include "perfetto/public/compiler.h"
39 #include "src/trace_processor/importers/common/clock_tracker.h"
40 #include "src/trace_processor/importers/common/event_tracker.h"
41 #include "src/trace_processor/importers/common/metadata_tracker.h"
42 #include "src/trace_processor/importers/proto/packet_analyzer.h"
43 #include "src/trace_processor/importers/proto/proto_importer_module.h"
44 #include "src/trace_processor/sorter/trace_sorter.h"
45 #include "src/trace_processor/storage/metadata.h"
46 #include "src/trace_processor/storage/stats.h"
47 #include "src/trace_processor/storage/trace_storage.h"
48 #include "src/trace_processor/tables/metadata_tables_py.h"
49 #include "src/trace_processor/types/variadic.h"
50 #include "src/trace_processor/util/descriptors.h"
51 
52 #include "protos/perfetto/common/builtin_clock.pbzero.h"
53 #include "protos/perfetto/common/trace_stats.pbzero.h"
54 #include "protos/perfetto/config/trace_config.pbzero.h"
55 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
56 #include "protos/perfetto/trace/extension_descriptor.pbzero.h"
57 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
58 #include "protos/perfetto/trace/remote_clock_sync.pbzero.h"
59 #include "protos/perfetto/trace/trace.pbzero.h"
60 #include "protos/perfetto/trace/trace_packet.pbzero.h"
61 
62 namespace perfetto::trace_processor {
63 
ProtoTraceReader(TraceProcessorContext * ctx)64 ProtoTraceReader::ProtoTraceReader(TraceProcessorContext* ctx)
65     : context_(ctx),
66       skipped_packet_key_id_(ctx->storage->InternString("skipped_packet")),
67       invalid_incremental_state_key_id_(
68           ctx->storage->InternString("invalid_incremental_state")) {}
69 ProtoTraceReader::~ProtoTraceReader() = default;
70 
Parse(TraceBlobView blob)71 base::Status ProtoTraceReader::Parse(TraceBlobView blob) {
72   return tokenizer_.Tokenize(std::move(blob), [this](TraceBlobView packet) {
73     return ParsePacket(std::move(packet));
74   });
75 }
76 
ParseExtensionDescriptor(ConstBytes descriptor)77 base::Status ProtoTraceReader::ParseExtensionDescriptor(ConstBytes descriptor) {
78   protos::pbzero::ExtensionDescriptor::Decoder decoder(descriptor.data,
79                                                        descriptor.size);
80 
81   auto extension = decoder.extension_set();
82   return context_->descriptor_pool_->AddFromFileDescriptorSet(
83       extension.data, extension.size,
84       /*skip_prefixes*/ {},
85       /*merge_existing_messages=*/true);
86 }
87 
ParsePacket(TraceBlobView packet)88 base::Status ProtoTraceReader::ParsePacket(TraceBlobView packet) {
89   protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
90   if (PERFETTO_UNLIKELY(decoder.bytes_left())) {
91     return base::ErrStatus(
92         "Failed to parse proto packet fully; the trace is probably corrupt.");
93   }
94 
95   // Any compressed packets should have been handled by the tokenizer.
96   PERFETTO_CHECK(!decoder.has_compressed_packets());
97 
98   // When the trace packet is emitted from a remote machine: parse the packet
99   // using a different ProtoTraceReader instance. The packet will be parsed
100   // in the context of the remote machine.
101   if (PERFETTO_UNLIKELY(decoder.has_machine_id())) {
102     if (!context_->machine_id()) {
103       // Default context: switch to another reader instance to parse the packet.
104       PERFETTO_DCHECK(context_->multi_machine_trace_manager);
105       auto* reader = context_->multi_machine_trace_manager->GetOrCreateReader(
106           decoder.machine_id());
107       return reader->ParsePacket(std::move(packet));
108     }
109   }
110   // Assert that the packet is parsed using the right instance of reader.
111   PERFETTO_DCHECK(decoder.has_machine_id() == !!context_->machine_id());
112 
113   uint32_t seq_id = decoder.trusted_packet_sequence_id();
114   auto [scoped_state, inserted] = sequence_state_.Insert(seq_id, {});
115   if (decoder.has_trusted_packet_sequence_id()) {
116     if (!inserted && decoder.previous_packet_dropped()) {
117       ++scoped_state->previous_packet_dropped_count;
118     }
119   }
120 
121   if (decoder.first_packet_on_sequence()) {
122     HandleFirstPacketOnSequence(seq_id);
123   }
124 
125   uint32_t sequence_flags = decoder.sequence_flags();
126   if (decoder.incremental_state_cleared() ||
127       sequence_flags &
128           protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
129     HandleIncrementalStateCleared(decoder);
130   } else if (decoder.previous_packet_dropped()) {
131     HandlePreviousPacketDropped(decoder);
132   }
133 
134   // It is important that we parse defaults before parsing other fields such as
135   // the timestamp, since the defaults could affect them.
136   if (decoder.has_trace_packet_defaults()) {
137     auto field = decoder.trace_packet_defaults();
138     ParseTracePacketDefaults(decoder, packet.slice(field.data, field.size));
139   }
140 
141   if (decoder.has_interned_data()) {
142     auto field = decoder.interned_data();
143     ParseInternedData(decoder, packet.slice(field.data, field.size));
144   }
145 
146   if (decoder.has_clock_snapshot()) {
147     return ParseClockSnapshot(decoder.clock_snapshot(), seq_id);
148   }
149 
150   if (decoder.has_trace_stats()) {
151     ParseTraceStats(decoder.trace_stats());
152   }
153 
154   if (decoder.has_remote_clock_sync()) {
155     PERFETTO_DCHECK(context_->machine_id());
156     return ParseRemoteClockSync(decoder.remote_clock_sync());
157   }
158 
159   if (decoder.has_service_event()) {
160     PERFETTO_DCHECK(decoder.has_timestamp());
161     int64_t ts = static_cast<int64_t>(decoder.timestamp());
162     return ParseServiceEvent(ts, decoder.service_event());
163   }
164 
165   if (decoder.has_extension_descriptor()) {
166     return ParseExtensionDescriptor(decoder.extension_descriptor());
167   }
168 
169   auto* state = GetIncrementalStateForPacketSequence(seq_id);
170   if (decoder.sequence_flags() &
171       protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
172     if (!seq_id) {
173       return base::ErrStatus(
174           "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
175           "TraceWriter's sequence_id is zero (the service is "
176           "probably too old)");
177     }
178     scoped_state->needs_incremental_state_total++;
179 
180     if (!state->IsIncrementalStateValid()) {
181       if (context_->content_analyzer) {
182         // Account for the skipped packet for trace proto content analysis,
183         // with a special annotation.
184         PacketAnalyzer::SampleAnnotation annotation;
185         annotation.emplace_back(skipped_packet_key_id_,
186                                 invalid_incremental_state_key_id_);
187         PacketAnalyzer::Get(context_)->ProcessPacket(packet, annotation);
188       }
189       scoped_state->needs_incremental_state_skipped++;
190       context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
191       return base::OkStatus();
192     }
193   }
194 
195   protos::pbzero::TracePacketDefaults::Decoder* defaults =
196       state->current_generation()->GetTracePacketDefaults();
197 
198   int64_t timestamp;
199   if (decoder.has_timestamp()) {
200     timestamp = static_cast<int64_t>(decoder.timestamp());
201 
202     uint32_t timestamp_clock_id =
203         decoder.has_timestamp_clock_id()
204             ? decoder.timestamp_clock_id()
205             : (defaults ? defaults->timestamp_clock_id() : 0);
206 
207     if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
208         (!timestamp_clock_id ||
209          timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
210       // Chrome event timestamps are in MONOTONIC domain, but may occur in
211       // traces where (a) no clock snapshots exist or (b) no clock_id is
212       // specified for their timestamps. Adjust to trace time if we have a clock
213       // snapshot.
214       // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
215       // chrome and then remove this.
216       auto trace_ts = context_->clock_tracker->ToTraceTime(
217           protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
218       if (trace_ts.ok())
219         timestamp = trace_ts.value();
220     } else if (timestamp_clock_id) {
221       // If the TracePacket specifies a non-zero clock-id, translate the
222       // timestamp into the trace-time clock domain.
223       ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
224       if (ClockTracker::IsSequenceClock(converted_clock_id)) {
225         if (!seq_id) {
226           return base::ErrStatus(
227               "TracePacket specified a sequence-local clock id (%" PRIu32
228               ") but the TraceWriter's sequence_id is zero (the service is "
229               "probably too old)",
230               timestamp_clock_id);
231         }
232         converted_clock_id =
233             ClockTracker::SequenceToGlobalClock(seq_id, timestamp_clock_id);
234       }
235       auto trace_ts =
236           context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
237       if (!trace_ts.ok()) {
238         // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
239         // We don't return an error here as it will cause the trace to stop
240         // parsing. Instead, we rely on the stat increment in ToTraceTime() to
241         // inform the user about the error.
242         return base::OkStatus();
243       }
244       timestamp = trace_ts.value();
245     }
246   } else {
247     timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
248   }
249   latest_timestamp_ = std::max(timestamp, latest_timestamp_);
250 
251   if (context_->content_analyzer && !decoder.has_track_event()) {
252     PacketAnalyzer::Get(context_)->ProcessPacket(packet, {});
253   }
254 
255   auto& modules = context_->modules_by_field;
256   for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
257     if (!modules[field_id].empty() && decoder.Get(field_id).valid()) {
258       for (ProtoImporterModule* global_module :
259            context_->modules_for_all_fields) {
260         ModuleResult res = global_module->TokenizePacket(
261             decoder, &packet, timestamp, state->current_generation(), field_id);
262         if (!res.ignored())
263           return res.ToStatus();
264       }
265       for (ProtoImporterModule* module : modules[field_id]) {
266         ModuleResult res = module->TokenizePacket(
267             decoder, &packet, timestamp, state->current_generation(), field_id);
268         if (!res.ignored())
269           return res.ToStatus();
270       }
271     }
272   }
273 
274   if (decoder.has_trace_config()) {
275     ParseTraceConfig(decoder.trace_config());
276   }
277 
278   // Use parent data and length because we want to parse this again
279   // later to get the exact type of the packet.
280   context_->sorter->PushTracePacket(timestamp, state->current_generation(),
281                                     std::move(packet), context_->machine_id());
282 
283   return base::OkStatus();
284 }
285 
ParseTraceConfig(protozero::ConstBytes blob)286 void ProtoTraceReader::ParseTraceConfig(protozero::ConstBytes blob) {
287   using Config = protos::pbzero::TraceConfig;
288   Config::Decoder trace_config(blob);
289   if (trace_config.write_into_file()) {
290     if (!trace_config.flush_period_ms()) {
291       context_->storage->IncrementStats(stats::config_write_into_file_no_flush);
292     }
293     int i = 0;
294     for (auto it = trace_config.buffers(); it; ++it, ++i) {
295       Config::BufferConfig::Decoder buf(*it);
296       if (buf.fill_policy() == Config::BufferConfig::FillPolicy::DISCARD) {
297         context_->storage->IncrementIndexedStats(
298             stats::config_write_into_file_discard, i);
299       }
300     }
301   }
302 }
303 
HandleIncrementalStateCleared(const protos::pbzero::TracePacket::Decoder & packet_decoder)304 void ProtoTraceReader::HandleIncrementalStateCleared(
305     const protos::pbzero::TracePacket::Decoder& packet_decoder) {
306   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
307     PERFETTO_ELOG(
308         "incremental_state_cleared without trusted_packet_sequence_id");
309     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
310     return;
311   }
312   GetIncrementalStateForPacketSequence(
313       packet_decoder.trusted_packet_sequence_id())
314       ->OnIncrementalStateCleared();
315   for (auto& module : context_->modules) {
316     module->OnIncrementalStateCleared(
317         packet_decoder.trusted_packet_sequence_id());
318   }
319 }
320 
HandleFirstPacketOnSequence(uint32_t packet_sequence_id)321 void ProtoTraceReader::HandleFirstPacketOnSequence(
322     uint32_t packet_sequence_id) {
323   for (auto& module : context_->modules) {
324     module->OnFirstPacketOnSequence(packet_sequence_id);
325   }
326 }
327 
HandlePreviousPacketDropped(const protos::pbzero::TracePacket::Decoder & packet_decoder)328 void ProtoTraceReader::HandlePreviousPacketDropped(
329     const protos::pbzero::TracePacket::Decoder& packet_decoder) {
330   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
331     PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
332     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
333     return;
334   }
335   GetIncrementalStateForPacketSequence(
336       packet_decoder.trusted_packet_sequence_id())
337       ->OnPacketLoss();
338 }
339 
ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder & packet_decoder,TraceBlobView trace_packet_defaults)340 void ProtoTraceReader::ParseTracePacketDefaults(
341     const protos::pbzero::TracePacket_Decoder& packet_decoder,
342     TraceBlobView trace_packet_defaults) {
343   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
344     PERFETTO_ELOG(
345         "TracePacketDefaults packet without trusted_packet_sequence_id");
346     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
347     return;
348   }
349 
350   auto* state = GetIncrementalStateForPacketSequence(
351       packet_decoder.trusted_packet_sequence_id());
352   state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
353 }
354 
ParseInternedData(const protos::pbzero::TracePacket::Decoder & packet_decoder,TraceBlobView interned_data)355 void ProtoTraceReader::ParseInternedData(
356     const protos::pbzero::TracePacket::Decoder& packet_decoder,
357     TraceBlobView interned_data) {
358   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
359     PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
360     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
361     return;
362   }
363 
364   auto* state = GetIncrementalStateForPacketSequence(
365       packet_decoder.trusted_packet_sequence_id());
366 
367   // Don't parse interned data entries until incremental state is valid, because
368   // they could otherwise be associated with the wrong generation in the state.
369   if (!state->IsIncrementalStateValid()) {
370     context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
371     return;
372   }
373 
374   // Store references to interned data submessages into the sequence's state.
375   protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
376   for (protozero::Field f = decoder.ReadField(); f.valid();
377        f = decoder.ReadField()) {
378     auto bytes = f.as_bytes();
379     state->InternMessage(f.id(), interned_data.slice(bytes.data, bytes.size));
380   }
381 }
382 
ParseClockSnapshot(ConstBytes blob,uint32_t seq_id)383 base::Status ProtoTraceReader::ParseClockSnapshot(ConstBytes blob,
384                                                   uint32_t seq_id) {
385   std::vector<ClockTracker::ClockTimestamp> clock_timestamps;
386   protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
387   if (evt.primary_trace_clock()) {
388     context_->clock_tracker->SetTraceTimeClock(
389         static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
390   }
391   for (auto it = evt.clocks(); it; ++it) {
392     protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
393     ClockTracker::ClockId clock_id = clk.clock_id();
394     if (ClockTracker::IsSequenceClock(clk.clock_id())) {
395       if (!seq_id) {
396         return base::ErrStatus(
397             "ClockSnapshot packet is specifying a sequence-scoped clock id "
398             "(%" PRIu64 ") but the TracePacket sequence_id is zero",
399             clock_id);
400       }
401       clock_id = ClockTracker::SequenceToGlobalClock(seq_id, clk.clock_id());
402     }
403     int64_t unit_multiplier_ns =
404         clk.unit_multiplier_ns()
405             ? static_cast<int64_t>(clk.unit_multiplier_ns())
406             : 1;
407     clock_timestamps.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
408                                   clk.is_incremental());
409   }
410 
411   base::StatusOr<uint32_t> snapshot_id =
412       context_->clock_tracker->AddSnapshot(clock_timestamps);
413   if (!snapshot_id.ok()) {
414     PERFETTO_ELOG("%s", snapshot_id.status().c_message());
415     return base::OkStatus();
416   }
417 
418   std::optional<int64_t> trace_time_from_snapshot =
419       context_->clock_tracker->ToTraceTimeFromSnapshot(clock_timestamps);
420 
421   // Add the all the clock snapshots to the clock snapshot table.
422   std::optional<int64_t> trace_ts_for_check;
423   for (const auto& clock_timestamp : clock_timestamps) {
424     // If the clock is incremental, we need to use 0 to map correctly to
425     // |absolute_timestamp|.
426     int64_t ts_to_convert =
427         clock_timestamp.clock.is_incremental ? 0 : clock_timestamp.timestamp;
428     // Even if we have trace time from snapshot, we still run ToTraceTime to
429     // optimise future conversions.
430     base::StatusOr<int64_t> opt_trace_ts = context_->clock_tracker->ToTraceTime(
431         clock_timestamp.clock.id, ts_to_convert);
432 
433     if (!opt_trace_ts.ok()) {
434       // This can happen if |AddSnapshot| failed to resolve this clock, e.g. if
435       // clock is not monotonic. Try to fetch trace time from snapshot.
436       if (!trace_time_from_snapshot) {
437         PERFETTO_DLOG("%s", opt_trace_ts.status().c_message());
438         continue;
439       }
440       opt_trace_ts = *trace_time_from_snapshot;
441     }
442 
443     // Double check that all the clocks in this snapshot resolve to the same
444     // trace timestamp value.
445     PERFETTO_DCHECK(!trace_ts_for_check ||
446                     opt_trace_ts.value() == trace_ts_for_check.value());
447     trace_ts_for_check = *opt_trace_ts;
448 
449     tables::ClockSnapshotTable::Row row;
450     row.ts = *opt_trace_ts;
451     row.clock_id = static_cast<int64_t>(clock_timestamp.clock.id);
452     row.clock_value =
453         clock_timestamp.timestamp * clock_timestamp.clock.unit_multiplier_ns;
454     row.clock_name = GetBuiltinClockNameOrNull(clock_timestamp.clock.id);
455     row.snapshot_id = *snapshot_id;
456     row.machine_id = context_->machine_id();
457 
458     context_->storage->mutable_clock_snapshot_table()->Insert(row);
459   }
460   return base::OkStatus();
461 }
462 
ParseRemoteClockSync(ConstBytes blob)463 base::Status ProtoTraceReader::ParseRemoteClockSync(ConstBytes blob) {
464   protos::pbzero::RemoteClockSync::Decoder evt(blob.data, blob.size);
465 
466   std::vector<SyncClockSnapshots> sync_clock_snapshots;
467   // Decode the RemoteClockSync message into a struct for calculating offsets.
468   for (auto it = evt.synced_clocks(); it; ++it) {
469     sync_clock_snapshots.emplace_back();
470     auto& sync_clocks = sync_clock_snapshots.back();
471 
472     protos::pbzero::RemoteClockSync::SyncedClocks::Decoder synced_clocks(*it);
473     protos::pbzero::ClockSnapshot::ClockSnapshot::Decoder host_clocks(
474         synced_clocks.host_clocks());
475     for (auto clock_it = host_clocks.clocks(); clock_it; clock_it++) {
476       protos::pbzero::ClockSnapshot::ClockSnapshot::Clock::Decoder clock(
477           *clock_it);
478       sync_clocks[clock.clock_id()].first = clock.timestamp();
479     }
480 
481     std::vector<ClockTracker::ClockTimestamp> clock_timestamps;
482     protos::pbzero::ClockSnapshot::ClockSnapshot::Decoder client_clocks(
483         synced_clocks.client_clocks());
484     for (auto clock_it = client_clocks.clocks(); clock_it; clock_it++) {
485       protos::pbzero::ClockSnapshot::ClockSnapshot::Clock::Decoder clock(
486           *clock_it);
487       sync_clocks[clock.clock_id()].second = clock.timestamp();
488       clock_timestamps.emplace_back(clock.clock_id(), clock.timestamp(), 1,
489                                     false);
490     }
491 
492     // In addition for calculating clock offsets, client clock snapshots are
493     // also added to clock tracker to emulate tracing service taking periodical
494     // clock snapshots. This builds a clock conversion path from a local trace
495     // time (e.g. Chrome trace time) to client builtin clock (CLOCK_MONOTONIC)
496     // which can be converted to host trace time (CLOCK_BOOTTIME).
497     context_->clock_tracker->AddSnapshot(clock_timestamps);
498   }
499 
500   // Calculate clock offsets and report to the ClockTracker.
501   auto clock_offsets = CalculateClockOffsets(sync_clock_snapshots);
502   for (auto it = clock_offsets.GetIterator(); it; ++it) {
503     context_->clock_tracker->SetClockOffset(it.key(), it.value());
504   }
505 
506   return base::OkStatus();
507 }
508 
509 base::FlatHashMap<int64_t /*Clock Id*/, int64_t /*Offset*/>
CalculateClockOffsets(std::vector<SyncClockSnapshots> & sync_clock_snapshots)510 ProtoTraceReader::CalculateClockOffsets(
511     std::vector<SyncClockSnapshots>& sync_clock_snapshots) {
512   base::FlatHashMap<int64_t /*Clock Id*/, int64_t /*Offset*/> clock_offsets;
513 
514   // The RemoteClockSync message contains a sequence of |synced_clocks|
515   // messages. Each |synced_clocks| message contains pairs of ClockSnapshots
516   // taken on both the client and host sides.
517   //
518   // The "synced_clocks" messages are emitted periodically. A single round of
519   // data collection involves four snapshots:
520   //   1. Client snapshot
521   //   2. Host snapshot (triggered by client's IPC message)
522   //   3. Client snapshot (triggered by host's IPC message)
523   //   4. Host snapshot
524   //
525   // These four snapshots are used to estimate the clock offset between the
526   // client and host for each default clock domain present in the ClockSnapshot.
527   std::map<int64_t, std::vector<int64_t>> raw_clock_offsets;
528   // Remote clock syncs happen in an interval of 30 sec. 2 adjacent clock
529   // snapshots belong to the same round if they happen within 30 secs.
530   constexpr uint64_t clock_sync_interval_ns = 30lu * 1000000000;
531   for (size_t i = 1; i < sync_clock_snapshots.size(); i++) {
532     // Synced clocks are taken by client snapshot -> host snapshot.
533     auto& ping_clocks = sync_clock_snapshots[i - 1];
534     auto& update_clocks = sync_clock_snapshots[i];
535 
536     auto ping_client =
537         ping_clocks[protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME]
538             .second;
539     auto update_client =
540         update_clocks[protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME]
541             .second;
542     // |ping_clocks| and |update_clocks| belong to 2 different rounds of remote
543     // clock sync rounds.
544     if (update_client - ping_client >= clock_sync_interval_ns)
545       continue;
546 
547     for (auto it = ping_clocks.GetIterator(); it; ++it) {
548       const auto clock_id = it.key();
549       const auto [t1h, t1c] = it.value();
550       const auto [t2h, t2c] = update_clocks[clock_id];
551 
552       if (!t1h || !t1c || !t2h || !t2c)
553         continue;
554 
555       int64_t offset1 =
556           static_cast<int64_t>(t1c + t2c) / 2 - static_cast<int64_t>(t1h);
557       int64_t offset2 =
558           static_cast<int64_t>(t2c) - static_cast<int64_t>(t1h + t2h) / 2;
559 
560       // Clock values are taken in the order of t1c, t1h, t2c, t2h. Offset
561       // calculation requires at least 3 timestamps as a round trip. We have 4,
562       // which can be treated as 2 round trips:
563       //   1. t1c, t1h, t2c as the round trip initiated by the client. Offset 1
564       //      = (t1c + t2c) / 2 - t1h
565       //   2. t1h, t2c, t2h as the round trip initiated by the host. Offset 2 =
566       //      t2c - (t1h + t2h) / 2
567       raw_clock_offsets[clock_id].push_back(offset1);
568       raw_clock_offsets[clock_id].push_back(offset2);
569     }
570 
571     // Use the average of estimated clock offsets in the clock tracker.
572     for (const auto& [clock_id, offsets] : raw_clock_offsets) {
573       int64_t avg_offset =
574           std::accumulate(offsets.begin(), offsets.end(), 0LL) /
575           static_cast<int64_t>(offsets.size());
576       clock_offsets[clock_id] = avg_offset;
577     }
578   }
579 
580   return clock_offsets;
581 }
582 
GetBuiltinClockNameOrNull(int64_t clock_id)583 std::optional<StringId> ProtoTraceReader::GetBuiltinClockNameOrNull(
584     int64_t clock_id) {
585   switch (clock_id) {
586     case protos::pbzero::ClockSnapshot::Clock::REALTIME:
587       return context_->storage->InternString("REALTIME");
588     case protos::pbzero::ClockSnapshot::Clock::REALTIME_COARSE:
589       return context_->storage->InternString("REALTIME_COARSE");
590     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC:
591       return context_->storage->InternString("MONOTONIC");
592     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_COARSE:
593       return context_->storage->InternString("MONOTONIC_COARSE");
594     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_RAW:
595       return context_->storage->InternString("MONOTONIC_RAW");
596     case protos::pbzero::ClockSnapshot::Clock::BOOTTIME:
597       return context_->storage->InternString("BOOTTIME");
598     default:
599       return std::nullopt;
600   }
601 }
602 
ParseServiceEvent(int64_t ts,ConstBytes blob)603 base::Status ProtoTraceReader::ParseServiceEvent(int64_t ts, ConstBytes blob) {
604   protos::pbzero::TracingServiceEvent::Decoder tse(blob);
605   if (tse.tracing_started()) {
606     context_->metadata_tracker->SetMetadata(metadata::tracing_started_ns,
607                                             Variadic::Integer(ts));
608   }
609   if (tse.tracing_disabled()) {
610     context_->metadata_tracker->SetMetadata(metadata::tracing_disabled_ns,
611                                             Variadic::Integer(ts));
612   }
613   if (tse.all_data_sources_started()) {
614     context_->metadata_tracker->SetMetadata(
615         metadata::all_data_source_started_ns, Variadic::Integer(ts));
616   }
617   if (tse.all_data_sources_flushed()) {
618     context_->metadata_tracker->AppendMetadata(
619         metadata::all_data_source_flushed_ns, Variadic::Integer(ts));
620     context_->sorter->NotifyFlushEvent();
621   }
622   if (tse.read_tracing_buffers_completed()) {
623     context_->sorter->NotifyReadBufferEvent();
624   }
625   if (tse.has_slow_starting_data_sources()) {
626     protos::pbzero::TracingServiceEvent::DataSources::Decoder msg(
627         tse.slow_starting_data_sources());
628     for (auto it = msg.data_source(); it; it++) {
629       protos::pbzero::TracingServiceEvent::DataSources::DataSource::Decoder
630           data_source(*it);
631       std::string formatted = data_source.producer_name().ToStdString() + " " +
632                               data_source.data_source_name().ToStdString();
633       context_->metadata_tracker->AppendMetadata(
634           metadata::slow_start_data_source,
635           Variadic::String(
636               context_->storage->InternString(base::StringView(formatted))));
637     }
638   }
639   return base::OkStatus();
640 }
641 
ParseTraceStats(ConstBytes blob)642 void ProtoTraceReader::ParseTraceStats(ConstBytes blob) {
643   protos::pbzero::TraceStats::Decoder evt(blob.data, blob.size);
644   auto* storage = context_->storage.get();
645   storage->SetStats(stats::traced_producers_connected,
646                     static_cast<int64_t>(evt.producers_connected()));
647   storage->SetStats(stats::traced_producers_seen,
648                     static_cast<int64_t>(evt.producers_seen()));
649   storage->SetStats(stats::traced_data_sources_registered,
650                     static_cast<int64_t>(evt.data_sources_registered()));
651   storage->SetStats(stats::traced_data_sources_seen,
652                     static_cast<int64_t>(evt.data_sources_seen()));
653   storage->SetStats(stats::traced_tracing_sessions,
654                     static_cast<int64_t>(evt.tracing_sessions()));
655   storage->SetStats(stats::traced_total_buffers,
656                     static_cast<int64_t>(evt.total_buffers()));
657   storage->SetStats(stats::traced_chunks_discarded,
658                     static_cast<int64_t>(evt.chunks_discarded()));
659   storage->SetStats(stats::traced_patches_discarded,
660                     static_cast<int64_t>(evt.patches_discarded()));
661   storage->SetStats(stats::traced_flushes_requested,
662                     static_cast<int64_t>(evt.flushes_requested()));
663   storage->SetStats(stats::traced_flushes_succeeded,
664                     static_cast<int64_t>(evt.flushes_succeeded()));
665   storage->SetStats(stats::traced_flushes_failed,
666                     static_cast<int64_t>(evt.flushes_failed()));
667 
668   if (evt.has_filter_stats()) {
669     protos::pbzero::TraceStats::FilterStats::Decoder fstat(evt.filter_stats());
670     storage->SetStats(stats::filter_errors,
671                       static_cast<int64_t>(fstat.errors()));
672     storage->SetStats(stats::filter_input_bytes,
673                       static_cast<int64_t>(fstat.input_bytes()));
674     storage->SetStats(stats::filter_input_packets,
675                       static_cast<int64_t>(fstat.input_packets()));
676     storage->SetStats(stats::filter_output_bytes,
677                       static_cast<int64_t>(fstat.output_bytes()));
678     storage->SetStats(stats::filter_time_taken_ns,
679                       static_cast<int64_t>(fstat.time_taken_ns()));
680     for (auto [i, it] = std::tuple{0, fstat.bytes_discarded_per_buffer()}; it;
681          ++it, ++i) {
682       storage->SetIndexedStats(stats::traced_buf_bytes_filtered_out, i,
683                                static_cast<int64_t>(*it));
684     }
685   }
686 
687   switch (evt.final_flush_outcome()) {
688     case protos::pbzero::TraceStats::FINAL_FLUSH_SUCCEEDED:
689       storage->IncrementStats(stats::traced_final_flush_succeeded, 1);
690       break;
691     case protos::pbzero::TraceStats::FINAL_FLUSH_FAILED:
692       storage->IncrementStats(stats::traced_final_flush_failed, 1);
693       break;
694     case protos::pbzero::TraceStats::FINAL_FLUSH_UNSPECIFIED:
695       break;
696   }
697 
698   int buf_num = 0;
699   for (auto it = evt.buffer_stats(); it; ++it, ++buf_num) {
700     protos::pbzero::TraceStats::BufferStats::Decoder buf(*it);
701     storage->SetIndexedStats(stats::traced_buf_buffer_size, buf_num,
702                              static_cast<int64_t>(buf.buffer_size()));
703     storage->SetIndexedStats(stats::traced_buf_bytes_written, buf_num,
704                              static_cast<int64_t>(buf.bytes_written()));
705     storage->SetIndexedStats(stats::traced_buf_bytes_overwritten, buf_num,
706                              static_cast<int64_t>(buf.bytes_overwritten()));
707     storage->SetIndexedStats(stats::traced_buf_bytes_read, buf_num,
708                              static_cast<int64_t>(buf.bytes_read()));
709     storage->SetIndexedStats(stats::traced_buf_padding_bytes_written, buf_num,
710                              static_cast<int64_t>(buf.padding_bytes_written()));
711     storage->SetIndexedStats(stats::traced_buf_padding_bytes_cleared, buf_num,
712                              static_cast<int64_t>(buf.padding_bytes_cleared()));
713     storage->SetIndexedStats(stats::traced_buf_chunks_written, buf_num,
714                              static_cast<int64_t>(buf.chunks_written()));
715     storage->SetIndexedStats(stats::traced_buf_chunks_rewritten, buf_num,
716                              static_cast<int64_t>(buf.chunks_rewritten()));
717     storage->SetIndexedStats(stats::traced_buf_chunks_overwritten, buf_num,
718                              static_cast<int64_t>(buf.chunks_overwritten()));
719     storage->SetIndexedStats(stats::traced_buf_chunks_discarded, buf_num,
720                              static_cast<int64_t>(buf.chunks_discarded()));
721     storage->SetIndexedStats(stats::traced_buf_chunks_read, buf_num,
722                              static_cast<int64_t>(buf.chunks_read()));
723     storage->SetIndexedStats(
724         stats::traced_buf_chunks_committed_out_of_order, buf_num,
725         static_cast<int64_t>(buf.chunks_committed_out_of_order()));
726     storage->SetIndexedStats(stats::traced_buf_write_wrap_count, buf_num,
727                              static_cast<int64_t>(buf.write_wrap_count()));
728     storage->SetIndexedStats(stats::traced_buf_patches_succeeded, buf_num,
729                              static_cast<int64_t>(buf.patches_succeeded()));
730     storage->SetIndexedStats(stats::traced_buf_patches_failed, buf_num,
731                              static_cast<int64_t>(buf.patches_failed()));
732     storage->SetIndexedStats(stats::traced_buf_readaheads_succeeded, buf_num,
733                              static_cast<int64_t>(buf.readaheads_succeeded()));
734     storage->SetIndexedStats(stats::traced_buf_readaheads_failed, buf_num,
735                              static_cast<int64_t>(buf.readaheads_failed()));
736     storage->SetIndexedStats(stats::traced_buf_abi_violations, buf_num,
737                              static_cast<int64_t>(buf.abi_violations()));
738     storage->SetIndexedStats(
739         stats::traced_buf_trace_writer_packet_loss, buf_num,
740         static_cast<int64_t>(buf.trace_writer_packet_loss()));
741   }
742 
743   struct BufStats {
744     uint32_t packet_loss = 0;
745     uint32_t incremental_sequences_dropped = 0;
746   };
747   base::FlatHashMap<int32_t, BufStats> stats_per_buffer;
748   for (auto it = evt.writer_stats(); it; ++it) {
749     protos::pbzero::TraceStats::WriterStats::Decoder w(*it);
750     auto seq_id = static_cast<uint32_t>(w.sequence_id());
751     if (auto* s = sequence_state_.Find(seq_id)) {
752       auto& stats = stats_per_buffer[static_cast<int32_t>(w.buffer())];
753       stats.packet_loss += s->previous_packet_dropped_count;
754       stats.incremental_sequences_dropped +=
755           s->needs_incremental_state_skipped > 0 &&
756           s->needs_incremental_state_skipped ==
757               s->needs_incremental_state_total;
758     }
759   }
760 
761   for (auto it = stats_per_buffer.GetIterator(); it; ++it) {
762     auto& v = it.value();
763     storage->SetIndexedStats(stats::traced_buf_sequence_packet_loss, it.key(),
764                              v.packet_loss);
765     storage->SetIndexedStats(stats::traced_buf_incremental_sequences_dropped,
766                              it.key(), v.incremental_sequences_dropped);
767   }
768 }
769 
NotifyEndOfFile()770 base::Status ProtoTraceReader::NotifyEndOfFile() {
771   return base::OkStatus();
772 }
773 
774 }  // namespace perfetto::trace_processor
775