1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/importers/proto/proto_trace_reader.h"
18
19 #include <algorithm>
20 #include <cinttypes>
21 #include <cstddef>
22 #include <cstdint>
23 #include <map>
24 #include <numeric>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <utility>
29 #include <vector>
30
31 #include "perfetto/base/logging.h"
32 #include "perfetto/base/status.h"
33 #include "perfetto/ext/base/flat_hash_map.h"
34 #include "perfetto/ext/base/status_or.h"
35 #include "perfetto/ext/base/string_view.h"
36 #include "perfetto/protozero/field.h"
37 #include "perfetto/protozero/proto_decoder.h"
38 #include "perfetto/public/compiler.h"
39 #include "src/trace_processor/importers/common/clock_tracker.h"
40 #include "src/trace_processor/importers/common/event_tracker.h"
41 #include "src/trace_processor/importers/common/metadata_tracker.h"
42 #include "src/trace_processor/importers/proto/packet_analyzer.h"
43 #include "src/trace_processor/importers/proto/proto_importer_module.h"
44 #include "src/trace_processor/sorter/trace_sorter.h"
45 #include "src/trace_processor/storage/metadata.h"
46 #include "src/trace_processor/storage/stats.h"
47 #include "src/trace_processor/storage/trace_storage.h"
48 #include "src/trace_processor/tables/metadata_tables_py.h"
49 #include "src/trace_processor/types/variadic.h"
50 #include "src/trace_processor/util/descriptors.h"
51
52 #include "protos/perfetto/common/builtin_clock.pbzero.h"
53 #include "protos/perfetto/common/trace_stats.pbzero.h"
54 #include "protos/perfetto/config/trace_config.pbzero.h"
55 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
56 #include "protos/perfetto/trace/extension_descriptor.pbzero.h"
57 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
58 #include "protos/perfetto/trace/remote_clock_sync.pbzero.h"
59 #include "protos/perfetto/trace/trace.pbzero.h"
60 #include "protos/perfetto/trace/trace_packet.pbzero.h"
61
62 namespace perfetto::trace_processor {
63
ProtoTraceReader(TraceProcessorContext * ctx)64 ProtoTraceReader::ProtoTraceReader(TraceProcessorContext* ctx)
65 : context_(ctx),
66 skipped_packet_key_id_(ctx->storage->InternString("skipped_packet")),
67 invalid_incremental_state_key_id_(
68 ctx->storage->InternString("invalid_incremental_state")) {}
69 ProtoTraceReader::~ProtoTraceReader() = default;
70
Parse(TraceBlobView blob)71 base::Status ProtoTraceReader::Parse(TraceBlobView blob) {
72 return tokenizer_.Tokenize(std::move(blob), [this](TraceBlobView packet) {
73 return ParsePacket(std::move(packet));
74 });
75 }
76
ParseExtensionDescriptor(ConstBytes descriptor)77 base::Status ProtoTraceReader::ParseExtensionDescriptor(ConstBytes descriptor) {
78 protos::pbzero::ExtensionDescriptor::Decoder decoder(descriptor.data,
79 descriptor.size);
80
81 auto extension = decoder.extension_set();
82 return context_->descriptor_pool_->AddFromFileDescriptorSet(
83 extension.data, extension.size,
84 /*skip_prefixes*/ {},
85 /*merge_existing_messages=*/true);
86 }
87
ParsePacket(TraceBlobView packet)88 base::Status ProtoTraceReader::ParsePacket(TraceBlobView packet) {
89 protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
90 if (PERFETTO_UNLIKELY(decoder.bytes_left())) {
91 return base::ErrStatus(
92 "Failed to parse proto packet fully; the trace is probably corrupt.");
93 }
94
95 // Any compressed packets should have been handled by the tokenizer.
96 PERFETTO_CHECK(!decoder.has_compressed_packets());
97
98 // When the trace packet is emitted from a remote machine: parse the packet
99 // using a different ProtoTraceReader instance. The packet will be parsed
100 // in the context of the remote machine.
101 if (PERFETTO_UNLIKELY(decoder.has_machine_id())) {
102 if (!context_->machine_id()) {
103 // Default context: switch to another reader instance to parse the packet.
104 PERFETTO_DCHECK(context_->multi_machine_trace_manager);
105 auto* reader = context_->multi_machine_trace_manager->GetOrCreateReader(
106 decoder.machine_id());
107 return reader->ParsePacket(std::move(packet));
108 }
109 }
110 // Assert that the packet is parsed using the right instance of reader.
111 PERFETTO_DCHECK(decoder.has_machine_id() == !!context_->machine_id());
112
113 uint32_t seq_id = decoder.trusted_packet_sequence_id();
114 auto [scoped_state, inserted] = sequence_state_.Insert(seq_id, {});
115 if (decoder.has_trusted_packet_sequence_id()) {
116 if (!inserted && decoder.previous_packet_dropped()) {
117 ++scoped_state->previous_packet_dropped_count;
118 }
119 }
120
121 if (decoder.first_packet_on_sequence()) {
122 HandleFirstPacketOnSequence(seq_id);
123 }
124
125 uint32_t sequence_flags = decoder.sequence_flags();
126 if (decoder.incremental_state_cleared() ||
127 sequence_flags &
128 protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
129 HandleIncrementalStateCleared(decoder);
130 } else if (decoder.previous_packet_dropped()) {
131 HandlePreviousPacketDropped(decoder);
132 }
133
134 // It is important that we parse defaults before parsing other fields such as
135 // the timestamp, since the defaults could affect them.
136 if (decoder.has_trace_packet_defaults()) {
137 auto field = decoder.trace_packet_defaults();
138 ParseTracePacketDefaults(decoder, packet.slice(field.data, field.size));
139 }
140
141 if (decoder.has_interned_data()) {
142 auto field = decoder.interned_data();
143 ParseInternedData(decoder, packet.slice(field.data, field.size));
144 }
145
146 if (decoder.has_clock_snapshot()) {
147 return ParseClockSnapshot(decoder.clock_snapshot(), seq_id);
148 }
149
150 if (decoder.has_trace_stats()) {
151 ParseTraceStats(decoder.trace_stats());
152 }
153
154 if (decoder.has_remote_clock_sync()) {
155 PERFETTO_DCHECK(context_->machine_id());
156 return ParseRemoteClockSync(decoder.remote_clock_sync());
157 }
158
159 if (decoder.has_service_event()) {
160 PERFETTO_DCHECK(decoder.has_timestamp());
161 int64_t ts = static_cast<int64_t>(decoder.timestamp());
162 return ParseServiceEvent(ts, decoder.service_event());
163 }
164
165 if (decoder.has_extension_descriptor()) {
166 return ParseExtensionDescriptor(decoder.extension_descriptor());
167 }
168
169 auto* state = GetIncrementalStateForPacketSequence(seq_id);
170 if (decoder.sequence_flags() &
171 protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
172 if (!seq_id) {
173 return base::ErrStatus(
174 "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
175 "TraceWriter's sequence_id is zero (the service is "
176 "probably too old)");
177 }
178 scoped_state->needs_incremental_state_total++;
179
180 if (!state->IsIncrementalStateValid()) {
181 if (context_->content_analyzer) {
182 // Account for the skipped packet for trace proto content analysis,
183 // with a special annotation.
184 PacketAnalyzer::SampleAnnotation annotation;
185 annotation.emplace_back(skipped_packet_key_id_,
186 invalid_incremental_state_key_id_);
187 PacketAnalyzer::Get(context_)->ProcessPacket(packet, annotation);
188 }
189 scoped_state->needs_incremental_state_skipped++;
190 context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
191 return base::OkStatus();
192 }
193 }
194
195 protos::pbzero::TracePacketDefaults::Decoder* defaults =
196 state->current_generation()->GetTracePacketDefaults();
197
198 int64_t timestamp;
199 if (decoder.has_timestamp()) {
200 timestamp = static_cast<int64_t>(decoder.timestamp());
201
202 uint32_t timestamp_clock_id =
203 decoder.has_timestamp_clock_id()
204 ? decoder.timestamp_clock_id()
205 : (defaults ? defaults->timestamp_clock_id() : 0);
206
207 if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
208 (!timestamp_clock_id ||
209 timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
210 // Chrome event timestamps are in MONOTONIC domain, but may occur in
211 // traces where (a) no clock snapshots exist or (b) no clock_id is
212 // specified for their timestamps. Adjust to trace time if we have a clock
213 // snapshot.
214 // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
215 // chrome and then remove this.
216 auto trace_ts = context_->clock_tracker->ToTraceTime(
217 protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
218 if (trace_ts.ok())
219 timestamp = trace_ts.value();
220 } else if (timestamp_clock_id) {
221 // If the TracePacket specifies a non-zero clock-id, translate the
222 // timestamp into the trace-time clock domain.
223 ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
224 if (ClockTracker::IsSequenceClock(converted_clock_id)) {
225 if (!seq_id) {
226 return base::ErrStatus(
227 "TracePacket specified a sequence-local clock id (%" PRIu32
228 ") but the TraceWriter's sequence_id is zero (the service is "
229 "probably too old)",
230 timestamp_clock_id);
231 }
232 converted_clock_id =
233 ClockTracker::SequenceToGlobalClock(seq_id, timestamp_clock_id);
234 }
235 auto trace_ts =
236 context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
237 if (!trace_ts.ok()) {
238 // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
239 // We don't return an error here as it will cause the trace to stop
240 // parsing. Instead, we rely on the stat increment in ToTraceTime() to
241 // inform the user about the error.
242 return base::OkStatus();
243 }
244 timestamp = trace_ts.value();
245 }
246 } else {
247 timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
248 }
249 latest_timestamp_ = std::max(timestamp, latest_timestamp_);
250
251 if (context_->content_analyzer && !decoder.has_track_event()) {
252 PacketAnalyzer::Get(context_)->ProcessPacket(packet, {});
253 }
254
255 auto& modules = context_->modules_by_field;
256 for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
257 if (!modules[field_id].empty() && decoder.Get(field_id).valid()) {
258 for (ProtoImporterModule* global_module :
259 context_->modules_for_all_fields) {
260 ModuleResult res = global_module->TokenizePacket(
261 decoder, &packet, timestamp, state->current_generation(), field_id);
262 if (!res.ignored())
263 return res.ToStatus();
264 }
265 for (ProtoImporterModule* module : modules[field_id]) {
266 ModuleResult res = module->TokenizePacket(
267 decoder, &packet, timestamp, state->current_generation(), field_id);
268 if (!res.ignored())
269 return res.ToStatus();
270 }
271 }
272 }
273
274 if (decoder.has_trace_config()) {
275 ParseTraceConfig(decoder.trace_config());
276 }
277
278 // Use parent data and length because we want to parse this again
279 // later to get the exact type of the packet.
280 context_->sorter->PushTracePacket(timestamp, state->current_generation(),
281 std::move(packet), context_->machine_id());
282
283 return base::OkStatus();
284 }
285
ParseTraceConfig(protozero::ConstBytes blob)286 void ProtoTraceReader::ParseTraceConfig(protozero::ConstBytes blob) {
287 using Config = protos::pbzero::TraceConfig;
288 Config::Decoder trace_config(blob);
289 if (trace_config.write_into_file()) {
290 if (!trace_config.flush_period_ms()) {
291 context_->storage->IncrementStats(stats::config_write_into_file_no_flush);
292 }
293 int i = 0;
294 for (auto it = trace_config.buffers(); it; ++it, ++i) {
295 Config::BufferConfig::Decoder buf(*it);
296 if (buf.fill_policy() == Config::BufferConfig::FillPolicy::DISCARD) {
297 context_->storage->IncrementIndexedStats(
298 stats::config_write_into_file_discard, i);
299 }
300 }
301 }
302 }
303
HandleIncrementalStateCleared(const protos::pbzero::TracePacket::Decoder & packet_decoder)304 void ProtoTraceReader::HandleIncrementalStateCleared(
305 const protos::pbzero::TracePacket::Decoder& packet_decoder) {
306 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
307 PERFETTO_ELOG(
308 "incremental_state_cleared without trusted_packet_sequence_id");
309 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
310 return;
311 }
312 GetIncrementalStateForPacketSequence(
313 packet_decoder.trusted_packet_sequence_id())
314 ->OnIncrementalStateCleared();
315 for (auto& module : context_->modules) {
316 module->OnIncrementalStateCleared(
317 packet_decoder.trusted_packet_sequence_id());
318 }
319 }
320
HandleFirstPacketOnSequence(uint32_t packet_sequence_id)321 void ProtoTraceReader::HandleFirstPacketOnSequence(
322 uint32_t packet_sequence_id) {
323 for (auto& module : context_->modules) {
324 module->OnFirstPacketOnSequence(packet_sequence_id);
325 }
326 }
327
HandlePreviousPacketDropped(const protos::pbzero::TracePacket::Decoder & packet_decoder)328 void ProtoTraceReader::HandlePreviousPacketDropped(
329 const protos::pbzero::TracePacket::Decoder& packet_decoder) {
330 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
331 PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
332 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
333 return;
334 }
335 GetIncrementalStateForPacketSequence(
336 packet_decoder.trusted_packet_sequence_id())
337 ->OnPacketLoss();
338 }
339
ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder & packet_decoder,TraceBlobView trace_packet_defaults)340 void ProtoTraceReader::ParseTracePacketDefaults(
341 const protos::pbzero::TracePacket_Decoder& packet_decoder,
342 TraceBlobView trace_packet_defaults) {
343 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
344 PERFETTO_ELOG(
345 "TracePacketDefaults packet without trusted_packet_sequence_id");
346 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
347 return;
348 }
349
350 auto* state = GetIncrementalStateForPacketSequence(
351 packet_decoder.trusted_packet_sequence_id());
352 state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
353 }
354
ParseInternedData(const protos::pbzero::TracePacket::Decoder & packet_decoder,TraceBlobView interned_data)355 void ProtoTraceReader::ParseInternedData(
356 const protos::pbzero::TracePacket::Decoder& packet_decoder,
357 TraceBlobView interned_data) {
358 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
359 PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
360 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
361 return;
362 }
363
364 auto* state = GetIncrementalStateForPacketSequence(
365 packet_decoder.trusted_packet_sequence_id());
366
367 // Don't parse interned data entries until incremental state is valid, because
368 // they could otherwise be associated with the wrong generation in the state.
369 if (!state->IsIncrementalStateValid()) {
370 context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
371 return;
372 }
373
374 // Store references to interned data submessages into the sequence's state.
375 protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
376 for (protozero::Field f = decoder.ReadField(); f.valid();
377 f = decoder.ReadField()) {
378 auto bytes = f.as_bytes();
379 state->InternMessage(f.id(), interned_data.slice(bytes.data, bytes.size));
380 }
381 }
382
ParseClockSnapshot(ConstBytes blob,uint32_t seq_id)383 base::Status ProtoTraceReader::ParseClockSnapshot(ConstBytes blob,
384 uint32_t seq_id) {
385 std::vector<ClockTracker::ClockTimestamp> clock_timestamps;
386 protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
387 if (evt.primary_trace_clock()) {
388 context_->clock_tracker->SetTraceTimeClock(
389 static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
390 }
391 for (auto it = evt.clocks(); it; ++it) {
392 protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
393 ClockTracker::ClockId clock_id = clk.clock_id();
394 if (ClockTracker::IsSequenceClock(clk.clock_id())) {
395 if (!seq_id) {
396 return base::ErrStatus(
397 "ClockSnapshot packet is specifying a sequence-scoped clock id "
398 "(%" PRIu64 ") but the TracePacket sequence_id is zero",
399 clock_id);
400 }
401 clock_id = ClockTracker::SequenceToGlobalClock(seq_id, clk.clock_id());
402 }
403 int64_t unit_multiplier_ns =
404 clk.unit_multiplier_ns()
405 ? static_cast<int64_t>(clk.unit_multiplier_ns())
406 : 1;
407 clock_timestamps.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
408 clk.is_incremental());
409 }
410
411 base::StatusOr<uint32_t> snapshot_id =
412 context_->clock_tracker->AddSnapshot(clock_timestamps);
413 if (!snapshot_id.ok()) {
414 PERFETTO_ELOG("%s", snapshot_id.status().c_message());
415 return base::OkStatus();
416 }
417
418 std::optional<int64_t> trace_time_from_snapshot =
419 context_->clock_tracker->ToTraceTimeFromSnapshot(clock_timestamps);
420
421 // Add the all the clock snapshots to the clock snapshot table.
422 std::optional<int64_t> trace_ts_for_check;
423 for (const auto& clock_timestamp : clock_timestamps) {
424 // If the clock is incremental, we need to use 0 to map correctly to
425 // |absolute_timestamp|.
426 int64_t ts_to_convert =
427 clock_timestamp.clock.is_incremental ? 0 : clock_timestamp.timestamp;
428 // Even if we have trace time from snapshot, we still run ToTraceTime to
429 // optimise future conversions.
430 base::StatusOr<int64_t> opt_trace_ts = context_->clock_tracker->ToTraceTime(
431 clock_timestamp.clock.id, ts_to_convert);
432
433 if (!opt_trace_ts.ok()) {
434 // This can happen if |AddSnapshot| failed to resolve this clock, e.g. if
435 // clock is not monotonic. Try to fetch trace time from snapshot.
436 if (!trace_time_from_snapshot) {
437 PERFETTO_DLOG("%s", opt_trace_ts.status().c_message());
438 continue;
439 }
440 opt_trace_ts = *trace_time_from_snapshot;
441 }
442
443 // Double check that all the clocks in this snapshot resolve to the same
444 // trace timestamp value.
445 PERFETTO_DCHECK(!trace_ts_for_check ||
446 opt_trace_ts.value() == trace_ts_for_check.value());
447 trace_ts_for_check = *opt_trace_ts;
448
449 tables::ClockSnapshotTable::Row row;
450 row.ts = *opt_trace_ts;
451 row.clock_id = static_cast<int64_t>(clock_timestamp.clock.id);
452 row.clock_value =
453 clock_timestamp.timestamp * clock_timestamp.clock.unit_multiplier_ns;
454 row.clock_name = GetBuiltinClockNameOrNull(clock_timestamp.clock.id);
455 row.snapshot_id = *snapshot_id;
456 row.machine_id = context_->machine_id();
457
458 context_->storage->mutable_clock_snapshot_table()->Insert(row);
459 }
460 return base::OkStatus();
461 }
462
ParseRemoteClockSync(ConstBytes blob)463 base::Status ProtoTraceReader::ParseRemoteClockSync(ConstBytes blob) {
464 protos::pbzero::RemoteClockSync::Decoder evt(blob.data, blob.size);
465
466 std::vector<SyncClockSnapshots> sync_clock_snapshots;
467 // Decode the RemoteClockSync message into a struct for calculating offsets.
468 for (auto it = evt.synced_clocks(); it; ++it) {
469 sync_clock_snapshots.emplace_back();
470 auto& sync_clocks = sync_clock_snapshots.back();
471
472 protos::pbzero::RemoteClockSync::SyncedClocks::Decoder synced_clocks(*it);
473 protos::pbzero::ClockSnapshot::ClockSnapshot::Decoder host_clocks(
474 synced_clocks.host_clocks());
475 for (auto clock_it = host_clocks.clocks(); clock_it; clock_it++) {
476 protos::pbzero::ClockSnapshot::ClockSnapshot::Clock::Decoder clock(
477 *clock_it);
478 sync_clocks[clock.clock_id()].first = clock.timestamp();
479 }
480
481 std::vector<ClockTracker::ClockTimestamp> clock_timestamps;
482 protos::pbzero::ClockSnapshot::ClockSnapshot::Decoder client_clocks(
483 synced_clocks.client_clocks());
484 for (auto clock_it = client_clocks.clocks(); clock_it; clock_it++) {
485 protos::pbzero::ClockSnapshot::ClockSnapshot::Clock::Decoder clock(
486 *clock_it);
487 sync_clocks[clock.clock_id()].second = clock.timestamp();
488 clock_timestamps.emplace_back(clock.clock_id(), clock.timestamp(), 1,
489 false);
490 }
491
492 // In addition for calculating clock offsets, client clock snapshots are
493 // also added to clock tracker to emulate tracing service taking periodical
494 // clock snapshots. This builds a clock conversion path from a local trace
495 // time (e.g. Chrome trace time) to client builtin clock (CLOCK_MONOTONIC)
496 // which can be converted to host trace time (CLOCK_BOOTTIME).
497 context_->clock_tracker->AddSnapshot(clock_timestamps);
498 }
499
500 // Calculate clock offsets and report to the ClockTracker.
501 auto clock_offsets = CalculateClockOffsets(sync_clock_snapshots);
502 for (auto it = clock_offsets.GetIterator(); it; ++it) {
503 context_->clock_tracker->SetClockOffset(it.key(), it.value());
504 }
505
506 return base::OkStatus();
507 }
508
509 base::FlatHashMap<int64_t /*Clock Id*/, int64_t /*Offset*/>
CalculateClockOffsets(std::vector<SyncClockSnapshots> & sync_clock_snapshots)510 ProtoTraceReader::CalculateClockOffsets(
511 std::vector<SyncClockSnapshots>& sync_clock_snapshots) {
512 base::FlatHashMap<int64_t /*Clock Id*/, int64_t /*Offset*/> clock_offsets;
513
514 // The RemoteClockSync message contains a sequence of |synced_clocks|
515 // messages. Each |synced_clocks| message contains pairs of ClockSnapshots
516 // taken on both the client and host sides.
517 //
518 // The "synced_clocks" messages are emitted periodically. A single round of
519 // data collection involves four snapshots:
520 // 1. Client snapshot
521 // 2. Host snapshot (triggered by client's IPC message)
522 // 3. Client snapshot (triggered by host's IPC message)
523 // 4. Host snapshot
524 //
525 // These four snapshots are used to estimate the clock offset between the
526 // client and host for each default clock domain present in the ClockSnapshot.
527 std::map<int64_t, std::vector<int64_t>> raw_clock_offsets;
528 // Remote clock syncs happen in an interval of 30 sec. 2 adjacent clock
529 // snapshots belong to the same round if they happen within 30 secs.
530 constexpr uint64_t clock_sync_interval_ns = 30lu * 1000000000;
531 for (size_t i = 1; i < sync_clock_snapshots.size(); i++) {
532 // Synced clocks are taken by client snapshot -> host snapshot.
533 auto& ping_clocks = sync_clock_snapshots[i - 1];
534 auto& update_clocks = sync_clock_snapshots[i];
535
536 auto ping_client =
537 ping_clocks[protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME]
538 .second;
539 auto update_client =
540 update_clocks[protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME]
541 .second;
542 // |ping_clocks| and |update_clocks| belong to 2 different rounds of remote
543 // clock sync rounds.
544 if (update_client - ping_client >= clock_sync_interval_ns)
545 continue;
546
547 for (auto it = ping_clocks.GetIterator(); it; ++it) {
548 const auto clock_id = it.key();
549 const auto [t1h, t1c] = it.value();
550 const auto [t2h, t2c] = update_clocks[clock_id];
551
552 if (!t1h || !t1c || !t2h || !t2c)
553 continue;
554
555 int64_t offset1 =
556 static_cast<int64_t>(t1c + t2c) / 2 - static_cast<int64_t>(t1h);
557 int64_t offset2 =
558 static_cast<int64_t>(t2c) - static_cast<int64_t>(t1h + t2h) / 2;
559
560 // Clock values are taken in the order of t1c, t1h, t2c, t2h. Offset
561 // calculation requires at least 3 timestamps as a round trip. We have 4,
562 // which can be treated as 2 round trips:
563 // 1. t1c, t1h, t2c as the round trip initiated by the client. Offset 1
564 // = (t1c + t2c) / 2 - t1h
565 // 2. t1h, t2c, t2h as the round trip initiated by the host. Offset 2 =
566 // t2c - (t1h + t2h) / 2
567 raw_clock_offsets[clock_id].push_back(offset1);
568 raw_clock_offsets[clock_id].push_back(offset2);
569 }
570
571 // Use the average of estimated clock offsets in the clock tracker.
572 for (const auto& [clock_id, offsets] : raw_clock_offsets) {
573 int64_t avg_offset =
574 std::accumulate(offsets.begin(), offsets.end(), 0LL) /
575 static_cast<int64_t>(offsets.size());
576 clock_offsets[clock_id] = avg_offset;
577 }
578 }
579
580 return clock_offsets;
581 }
582
GetBuiltinClockNameOrNull(int64_t clock_id)583 std::optional<StringId> ProtoTraceReader::GetBuiltinClockNameOrNull(
584 int64_t clock_id) {
585 switch (clock_id) {
586 case protos::pbzero::ClockSnapshot::Clock::REALTIME:
587 return context_->storage->InternString("REALTIME");
588 case protos::pbzero::ClockSnapshot::Clock::REALTIME_COARSE:
589 return context_->storage->InternString("REALTIME_COARSE");
590 case protos::pbzero::ClockSnapshot::Clock::MONOTONIC:
591 return context_->storage->InternString("MONOTONIC");
592 case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_COARSE:
593 return context_->storage->InternString("MONOTONIC_COARSE");
594 case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_RAW:
595 return context_->storage->InternString("MONOTONIC_RAW");
596 case protos::pbzero::ClockSnapshot::Clock::BOOTTIME:
597 return context_->storage->InternString("BOOTTIME");
598 default:
599 return std::nullopt;
600 }
601 }
602
ParseServiceEvent(int64_t ts,ConstBytes blob)603 base::Status ProtoTraceReader::ParseServiceEvent(int64_t ts, ConstBytes blob) {
604 protos::pbzero::TracingServiceEvent::Decoder tse(blob);
605 if (tse.tracing_started()) {
606 context_->metadata_tracker->SetMetadata(metadata::tracing_started_ns,
607 Variadic::Integer(ts));
608 }
609 if (tse.tracing_disabled()) {
610 context_->metadata_tracker->SetMetadata(metadata::tracing_disabled_ns,
611 Variadic::Integer(ts));
612 }
613 if (tse.all_data_sources_started()) {
614 context_->metadata_tracker->SetMetadata(
615 metadata::all_data_source_started_ns, Variadic::Integer(ts));
616 }
617 if (tse.all_data_sources_flushed()) {
618 context_->metadata_tracker->AppendMetadata(
619 metadata::all_data_source_flushed_ns, Variadic::Integer(ts));
620 context_->sorter->NotifyFlushEvent();
621 }
622 if (tse.read_tracing_buffers_completed()) {
623 context_->sorter->NotifyReadBufferEvent();
624 }
625 if (tse.has_slow_starting_data_sources()) {
626 protos::pbzero::TracingServiceEvent::DataSources::Decoder msg(
627 tse.slow_starting_data_sources());
628 for (auto it = msg.data_source(); it; it++) {
629 protos::pbzero::TracingServiceEvent::DataSources::DataSource::Decoder
630 data_source(*it);
631 std::string formatted = data_source.producer_name().ToStdString() + " " +
632 data_source.data_source_name().ToStdString();
633 context_->metadata_tracker->AppendMetadata(
634 metadata::slow_start_data_source,
635 Variadic::String(
636 context_->storage->InternString(base::StringView(formatted))));
637 }
638 }
639 return base::OkStatus();
640 }
641
ParseTraceStats(ConstBytes blob)642 void ProtoTraceReader::ParseTraceStats(ConstBytes blob) {
643 protos::pbzero::TraceStats::Decoder evt(blob.data, blob.size);
644 auto* storage = context_->storage.get();
645 storage->SetStats(stats::traced_producers_connected,
646 static_cast<int64_t>(evt.producers_connected()));
647 storage->SetStats(stats::traced_producers_seen,
648 static_cast<int64_t>(evt.producers_seen()));
649 storage->SetStats(stats::traced_data_sources_registered,
650 static_cast<int64_t>(evt.data_sources_registered()));
651 storage->SetStats(stats::traced_data_sources_seen,
652 static_cast<int64_t>(evt.data_sources_seen()));
653 storage->SetStats(stats::traced_tracing_sessions,
654 static_cast<int64_t>(evt.tracing_sessions()));
655 storage->SetStats(stats::traced_total_buffers,
656 static_cast<int64_t>(evt.total_buffers()));
657 storage->SetStats(stats::traced_chunks_discarded,
658 static_cast<int64_t>(evt.chunks_discarded()));
659 storage->SetStats(stats::traced_patches_discarded,
660 static_cast<int64_t>(evt.patches_discarded()));
661 storage->SetStats(stats::traced_flushes_requested,
662 static_cast<int64_t>(evt.flushes_requested()));
663 storage->SetStats(stats::traced_flushes_succeeded,
664 static_cast<int64_t>(evt.flushes_succeeded()));
665 storage->SetStats(stats::traced_flushes_failed,
666 static_cast<int64_t>(evt.flushes_failed()));
667
668 if (evt.has_filter_stats()) {
669 protos::pbzero::TraceStats::FilterStats::Decoder fstat(evt.filter_stats());
670 storage->SetStats(stats::filter_errors,
671 static_cast<int64_t>(fstat.errors()));
672 storage->SetStats(stats::filter_input_bytes,
673 static_cast<int64_t>(fstat.input_bytes()));
674 storage->SetStats(stats::filter_input_packets,
675 static_cast<int64_t>(fstat.input_packets()));
676 storage->SetStats(stats::filter_output_bytes,
677 static_cast<int64_t>(fstat.output_bytes()));
678 storage->SetStats(stats::filter_time_taken_ns,
679 static_cast<int64_t>(fstat.time_taken_ns()));
680 for (auto [i, it] = std::tuple{0, fstat.bytes_discarded_per_buffer()}; it;
681 ++it, ++i) {
682 storage->SetIndexedStats(stats::traced_buf_bytes_filtered_out, i,
683 static_cast<int64_t>(*it));
684 }
685 }
686
687 switch (evt.final_flush_outcome()) {
688 case protos::pbzero::TraceStats::FINAL_FLUSH_SUCCEEDED:
689 storage->IncrementStats(stats::traced_final_flush_succeeded, 1);
690 break;
691 case protos::pbzero::TraceStats::FINAL_FLUSH_FAILED:
692 storage->IncrementStats(stats::traced_final_flush_failed, 1);
693 break;
694 case protos::pbzero::TraceStats::FINAL_FLUSH_UNSPECIFIED:
695 break;
696 }
697
698 int buf_num = 0;
699 for (auto it = evt.buffer_stats(); it; ++it, ++buf_num) {
700 protos::pbzero::TraceStats::BufferStats::Decoder buf(*it);
701 storage->SetIndexedStats(stats::traced_buf_buffer_size, buf_num,
702 static_cast<int64_t>(buf.buffer_size()));
703 storage->SetIndexedStats(stats::traced_buf_bytes_written, buf_num,
704 static_cast<int64_t>(buf.bytes_written()));
705 storage->SetIndexedStats(stats::traced_buf_bytes_overwritten, buf_num,
706 static_cast<int64_t>(buf.bytes_overwritten()));
707 storage->SetIndexedStats(stats::traced_buf_bytes_read, buf_num,
708 static_cast<int64_t>(buf.bytes_read()));
709 storage->SetIndexedStats(stats::traced_buf_padding_bytes_written, buf_num,
710 static_cast<int64_t>(buf.padding_bytes_written()));
711 storage->SetIndexedStats(stats::traced_buf_padding_bytes_cleared, buf_num,
712 static_cast<int64_t>(buf.padding_bytes_cleared()));
713 storage->SetIndexedStats(stats::traced_buf_chunks_written, buf_num,
714 static_cast<int64_t>(buf.chunks_written()));
715 storage->SetIndexedStats(stats::traced_buf_chunks_rewritten, buf_num,
716 static_cast<int64_t>(buf.chunks_rewritten()));
717 storage->SetIndexedStats(stats::traced_buf_chunks_overwritten, buf_num,
718 static_cast<int64_t>(buf.chunks_overwritten()));
719 storage->SetIndexedStats(stats::traced_buf_chunks_discarded, buf_num,
720 static_cast<int64_t>(buf.chunks_discarded()));
721 storage->SetIndexedStats(stats::traced_buf_chunks_read, buf_num,
722 static_cast<int64_t>(buf.chunks_read()));
723 storage->SetIndexedStats(
724 stats::traced_buf_chunks_committed_out_of_order, buf_num,
725 static_cast<int64_t>(buf.chunks_committed_out_of_order()));
726 storage->SetIndexedStats(stats::traced_buf_write_wrap_count, buf_num,
727 static_cast<int64_t>(buf.write_wrap_count()));
728 storage->SetIndexedStats(stats::traced_buf_patches_succeeded, buf_num,
729 static_cast<int64_t>(buf.patches_succeeded()));
730 storage->SetIndexedStats(stats::traced_buf_patches_failed, buf_num,
731 static_cast<int64_t>(buf.patches_failed()));
732 storage->SetIndexedStats(stats::traced_buf_readaheads_succeeded, buf_num,
733 static_cast<int64_t>(buf.readaheads_succeeded()));
734 storage->SetIndexedStats(stats::traced_buf_readaheads_failed, buf_num,
735 static_cast<int64_t>(buf.readaheads_failed()));
736 storage->SetIndexedStats(stats::traced_buf_abi_violations, buf_num,
737 static_cast<int64_t>(buf.abi_violations()));
738 storage->SetIndexedStats(
739 stats::traced_buf_trace_writer_packet_loss, buf_num,
740 static_cast<int64_t>(buf.trace_writer_packet_loss()));
741 }
742
743 struct BufStats {
744 uint32_t packet_loss = 0;
745 uint32_t incremental_sequences_dropped = 0;
746 };
747 base::FlatHashMap<int32_t, BufStats> stats_per_buffer;
748 for (auto it = evt.writer_stats(); it; ++it) {
749 protos::pbzero::TraceStats::WriterStats::Decoder w(*it);
750 auto seq_id = static_cast<uint32_t>(w.sequence_id());
751 if (auto* s = sequence_state_.Find(seq_id)) {
752 auto& stats = stats_per_buffer[static_cast<int32_t>(w.buffer())];
753 stats.packet_loss += s->previous_packet_dropped_count;
754 stats.incremental_sequences_dropped +=
755 s->needs_incremental_state_skipped > 0 &&
756 s->needs_incremental_state_skipped ==
757 s->needs_incremental_state_total;
758 }
759 }
760
761 for (auto it = stats_per_buffer.GetIterator(); it; ++it) {
762 auto& v = it.value();
763 storage->SetIndexedStats(stats::traced_buf_sequence_packet_loss, it.key(),
764 v.packet_loss);
765 storage->SetIndexedStats(stats::traced_buf_incremental_sequences_dropped,
766 it.key(), v.incremental_sequences_dropped);
767 }
768 }
769
NotifyEndOfFile()770 base::Status ProtoTraceReader::NotifyEndOfFile() {
771 return base::OkStatus();
772 }
773
774 } // namespace perfetto::trace_processor
775