1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/profiling/perf/perf_producer.h"
18
19 #include <optional>
20 #include <random>
21 #include <utility>
22 #include <vector>
23
24 #include <unistd.h>
25
26 #include <unwindstack/Error.h>
27 #include <unwindstack/Unwinder.h>
28
29 #include "perfetto/base/logging.h"
30 #include "perfetto/base/task_runner.h"
31 #include "perfetto/ext/base/file_utils.h"
32 #include "perfetto/ext/base/metatrace.h"
33 #include "perfetto/ext/base/string_utils.h"
34 #include "perfetto/ext/base/utils.h"
35 #include "perfetto/ext/base/weak_ptr.h"
36 #include "perfetto/ext/tracing/core/basic_types.h"
37 #include "perfetto/ext/tracing/core/producer.h"
38 #include "perfetto/ext/tracing/core/tracing_service.h"
39 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
40 #include "perfetto/tracing/core/data_source_config.h"
41 #include "perfetto/tracing/core/data_source_descriptor.h"
42 #include "src/profiling/common/callstack_trie.h"
43 #include "src/profiling/common/proc_cmdline.h"
44 #include "src/profiling/common/producer_support.h"
45 #include "src/profiling/common/profiler_guardrails.h"
46 #include "src/profiling/common/unwind_support.h"
47 #include "src/profiling/perf/common_types.h"
48 #include "src/profiling/perf/event_reader.h"
49
50 #include "protos/perfetto/common/builtin_clock.pbzero.h"
51 #include "protos/perfetto/common/perf_events.gen.h"
52 #include "protos/perfetto/common/perf_events.pbzero.h"
53 #include "protos/perfetto/config/profiling/perf_event_config.gen.h"
54 #include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
55 #include "protos/perfetto/trace/trace_packet.pbzero.h"
56 #include "protos/perfetto/trace/trace_packet_defaults.pbzero.h"
57
58 namespace perfetto {
59 namespace profiling {
60 namespace {
61
62 // TODO(b/151835887): on Android, when using signals, there exists a vulnerable
63 // window between a process image being replaced by execve, and the new
64 // libc instance reinstalling the proper signal handlers. During this window,
65 // the signal disposition is defaulted to terminating the process.
66 // This is a best-effort mitigation from the daemon's side, using a heuristic
67 // that most execve calls follow a fork. So if we get a sample for a very fresh
68 // process, the grace period will give it a chance to get to
69 // a properly initialised state prior to getting signalled. This doesn't help
70 // cases when a mature process calls execve, or when the target gets descheduled
71 // (since this is a naive walltime wait).
72 // The proper fix is in the platform, see bug for progress.
73 constexpr uint32_t kProcDescriptorsAndroidDelayMs = 50;
74
75 constexpr uint32_t kMemoryLimitCheckPeriodMs = 1000;
76
77 constexpr uint32_t kInitialConnectionBackoffMs = 100;
78 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
79
80 constexpr char kProducerName[] = "perfetto.traced_perf";
81 constexpr char kDataSourceName[] = "linux.perf";
82
NumberOfCpus()83 size_t NumberOfCpus() {
84 return static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
85 }
86
GetOnlineCpus()87 std::vector<uint32_t> GetOnlineCpus() {
88 size_t cpu_count = NumberOfCpus();
89 if (cpu_count == 0) {
90 return {};
91 }
92
93 static constexpr char kOnlineValue[] = "1\n";
94 std::vector<uint32_t> online_cpus;
95 online_cpus.reserve(cpu_count);
96 for (uint32_t cpu = 0; cpu < cpu_count; ++cpu) {
97 std::string res;
98 base::StackString<1024> path("/sys/devices/system/cpu/cpu%u/online", cpu);
99 if (!base::ReadFile(path.c_str(), &res)) {
100 // Always consider CPU 0 to be online if the "online" file does not exist
101 // for it. There seem to be several assumptions in the kernel which make
102 // CPU 0 special so this is a pretty safe bet.
103 if (cpu != 0) {
104 return {};
105 }
106 res = kOnlineValue;
107 }
108 if (res != kOnlineValue) {
109 continue;
110 }
111 online_cpus.push_back(cpu);
112 }
113 return online_cpus;
114 }
115
ToBuiltinClock(int32_t clockid)116 int32_t ToBuiltinClock(int32_t clockid) {
117 switch (clockid) {
118 case CLOCK_REALTIME:
119 return protos::pbzero::BUILTIN_CLOCK_REALTIME;
120 case CLOCK_MONOTONIC:
121 return protos::pbzero::BUILTIN_CLOCK_MONOTONIC;
122 case CLOCK_MONOTONIC_RAW:
123 return protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW;
124 case CLOCK_BOOTTIME:
125 return protos::pbzero::BUILTIN_CLOCK_BOOTTIME;
126 // Should never get invalid input here as otherwise the syscall itself
127 // would've failed earlier.
128 default:
129 return protos::pbzero::BUILTIN_CLOCK_UNKNOWN;
130 }
131 }
132
StartTracePacket(TraceWriter * trace_writer)133 TraceWriter::TracePacketHandle StartTracePacket(TraceWriter* trace_writer) {
134 auto packet = trace_writer->NewTracePacket();
135 packet->set_sequence_flags(
136 protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
137 return packet;
138 }
139
WritePerfEventDefaultsPacket(const EventConfig & event_config,TraceWriter * trace_writer)140 void WritePerfEventDefaultsPacket(const EventConfig& event_config,
141 TraceWriter* trace_writer) {
142 auto packet = trace_writer->NewTracePacket();
143 packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
144 packet->set_timestamp_clock_id(protos::pbzero::BUILTIN_CLOCK_BOOTTIME);
145
146 // start new incremental state generation:
147 packet->set_sequence_flags(
148 protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);
149
150 // default packet timestamp clock for the samples:
151 perf_event_attr* perf_attr = event_config.perf_attr();
152 auto* defaults = packet->set_trace_packet_defaults();
153 int32_t builtin_clock = ToBuiltinClock(perf_attr->clockid);
154 defaults->set_timestamp_clock_id(static_cast<uint32_t>(builtin_clock));
155
156 auto* perf_defaults = defaults->set_perf_sample_defaults();
157 auto* timebase_pb = perf_defaults->set_timebase();
158
159 // frequency/period:
160 if (perf_attr->freq) {
161 timebase_pb->set_frequency(perf_attr->sample_freq);
162 } else {
163 timebase_pb->set_period(perf_attr->sample_period);
164 }
165
166 // timebase event:
167 const PerfCounter& timebase = event_config.timebase_event();
168 switch (timebase.event_type()) {
169 case PerfCounter::Type::kBuiltinCounter: {
170 timebase_pb->set_counter(
171 static_cast<protos::pbzero::PerfEvents::Counter>(timebase.counter));
172 break;
173 }
174 case PerfCounter::Type::kTracepoint: {
175 auto* tracepoint_pb = timebase_pb->set_tracepoint();
176 tracepoint_pb->set_name(timebase.tracepoint_name);
177 tracepoint_pb->set_filter(timebase.tracepoint_filter);
178 break;
179 }
180 case PerfCounter::Type::kRawEvent: {
181 auto* raw_pb = timebase_pb->set_raw_event();
182 raw_pb->set_type(timebase.attr_type);
183 raw_pb->set_config(timebase.attr_config);
184 raw_pb->set_config1(timebase.attr_config1);
185 raw_pb->set_config2(timebase.attr_config2);
186 break;
187 }
188 }
189
190 // optional name to identify the counter during parsing:
191 if (!timebase.name.empty()) {
192 timebase_pb->set_name(timebase.name);
193 }
194
195 // follower events:
196 for (const auto& e : event_config.follower_events()) {
197 auto* followers_pb = perf_defaults->add_followers();
198 followers_pb->set_name(e.name);
199
200 switch (e.event_type()) {
201 case PerfCounter::Type::kBuiltinCounter: {
202 followers_pb->set_counter(
203 static_cast<protos::pbzero::PerfEvents::Counter>(e.counter));
204 break;
205 }
206 case PerfCounter::Type::kTracepoint: {
207 auto* tracepoint_pb = followers_pb->set_tracepoint();
208 tracepoint_pb->set_name(e.tracepoint_name);
209 tracepoint_pb->set_filter(e.tracepoint_filter);
210 break;
211 }
212 case PerfCounter::Type::kRawEvent: {
213 auto* raw_pb = followers_pb->set_raw_event();
214 raw_pb->set_type(e.attr_type);
215 raw_pb->set_config(e.attr_config);
216 raw_pb->set_config1(e.attr_config1);
217 raw_pb->set_config2(e.attr_config2);
218 break;
219 }
220 }
221 }
222
223 // Not setting timebase.timestamp_clock since the field that matters during
224 // parsing is the root timestamp_clock_id set above.
225
226 // Record the random shard we've chosen so that the post-processing can infer
227 // which processes would've been unwound if sampled. In particular this lets
228 // us distinguish between "running but not chosen" and "running and chosen,
229 // but not sampled" cases.
230 const auto& process_sharding = event_config.filter().process_sharding;
231 if (process_sharding.has_value()) {
232 perf_defaults->set_process_shard_count(process_sharding->shard_count);
233 perf_defaults->set_chosen_process_shard(process_sharding->chosen_shard);
234 }
235 }
236
TimeToNextReadTickMs(DataSourceInstanceID ds_id,uint32_t period_ms)237 uint32_t TimeToNextReadTickMs(DataSourceInstanceID ds_id, uint32_t period_ms) {
238 // Normally, we'd schedule the next tick at the next |period_ms|
239 // boundary of the boot clock. However, to avoid aligning the read tasks of
240 // all concurrent data sources, we select a deterministic offset based on the
241 // data source id.
242 std::minstd_rand prng(static_cast<std::minstd_rand::result_type>(ds_id));
243 std::uniform_int_distribution<uint32_t> dist(0, period_ms - 1);
244 uint32_t ds_period_offset = dist(prng);
245
246 uint64_t now_ms = static_cast<uint64_t>(base::GetWallTimeMs().count());
247 return period_ms - ((now_ms - ds_period_offset) % period_ms);
248 }
249
ToCpuModeEnum(uint16_t perf_cpu_mode)250 protos::pbzero::Profiling::CpuMode ToCpuModeEnum(uint16_t perf_cpu_mode) {
251 using Profiling = protos::pbzero::Profiling;
252 switch (perf_cpu_mode) {
253 case PERF_RECORD_MISC_KERNEL:
254 return Profiling::MODE_KERNEL;
255 case PERF_RECORD_MISC_USER:
256 return Profiling::MODE_USER;
257 case PERF_RECORD_MISC_HYPERVISOR:
258 return Profiling::MODE_HYPERVISOR;
259 case PERF_RECORD_MISC_GUEST_KERNEL:
260 return Profiling::MODE_GUEST_KERNEL;
261 case PERF_RECORD_MISC_GUEST_USER:
262 return Profiling::MODE_GUEST_USER;
263 default:
264 return Profiling::MODE_UNKNOWN;
265 }
266 }
267
ToProtoEnum(unwindstack::ErrorCode error_code)268 protos::pbzero::Profiling::StackUnwindError ToProtoEnum(
269 unwindstack::ErrorCode error_code) {
270 using Profiling = protos::pbzero::Profiling;
271 switch (error_code) {
272 case unwindstack::ERROR_NONE:
273 return Profiling::UNWIND_ERROR_NONE;
274 case unwindstack::ERROR_MEMORY_INVALID:
275 return Profiling::UNWIND_ERROR_MEMORY_INVALID;
276 case unwindstack::ERROR_UNWIND_INFO:
277 return Profiling::UNWIND_ERROR_UNWIND_INFO;
278 case unwindstack::ERROR_UNSUPPORTED:
279 return Profiling::UNWIND_ERROR_UNSUPPORTED;
280 case unwindstack::ERROR_INVALID_MAP:
281 return Profiling::UNWIND_ERROR_INVALID_MAP;
282 case unwindstack::ERROR_MAX_FRAMES_EXCEEDED:
283 return Profiling::UNWIND_ERROR_MAX_FRAMES_EXCEEDED;
284 case unwindstack::ERROR_REPEATED_FRAME:
285 return Profiling::UNWIND_ERROR_REPEATED_FRAME;
286 case unwindstack::ERROR_INVALID_ELF:
287 return Profiling::UNWIND_ERROR_INVALID_ELF;
288 case unwindstack::ERROR_SYSTEM_CALL:
289 return Profiling::UNWIND_ERROR_SYSTEM_CALL;
290 case unwindstack::ERROR_THREAD_TIMEOUT:
291 return Profiling::UNWIND_ERROR_THREAD_TIMEOUT;
292 case unwindstack::ERROR_THREAD_DOES_NOT_EXIST:
293 return Profiling::UNWIND_ERROR_THREAD_DOES_NOT_EXIST;
294 case unwindstack::ERROR_BAD_ARCH:
295 return Profiling::UNWIND_ERROR_BAD_ARCH;
296 case unwindstack::ERROR_MAPS_PARSE:
297 return Profiling::UNWIND_ERROR_MAPS_PARSE;
298 case unwindstack::ERROR_INVALID_PARAMETER:
299 return Profiling::UNWIND_ERROR_INVALID_PARAMETER;
300 case unwindstack::ERROR_PTRACE_CALL:
301 return Profiling::UNWIND_ERROR_PTRACE_CALL;
302 }
303 return Profiling::UNWIND_ERROR_UNKNOWN;
304 }
305
306 } // namespace
307
308 // static
ShouldRejectDueToFilter(pid_t pid,const TargetFilter & filter,bool skip_cmdline,base::FlatSet<std::string> * additional_cmdlines,std::function<bool (std::string *)> read_proc_pid_cmdline)309 bool PerfProducer::ShouldRejectDueToFilter(
310 pid_t pid,
311 const TargetFilter& filter,
312 bool skip_cmdline,
313 base::FlatSet<std::string>* additional_cmdlines,
314 std::function<bool(std::string*)> read_proc_pid_cmdline) {
315 PERFETTO_CHECK(additional_cmdlines);
316
317 std::string cmdline;
318 bool have_cmdline = false;
319 if (!skip_cmdline)
320 have_cmdline = read_proc_pid_cmdline(&cmdline);
321
322 const char* binname = "";
323 if (have_cmdline) {
324 binname = glob_aware::FindBinaryName(cmdline.c_str(), cmdline.size());
325 }
326
327 auto has_matching_pattern = [](const std::vector<std::string>& patterns,
328 const char* cmd, const char* name) {
329 for (const std::string& pattern : patterns) {
330 if (glob_aware::MatchGlobPattern(pattern.c_str(), cmd, name)) {
331 return true;
332 }
333 }
334 return false;
335 };
336
337 if (have_cmdline &&
338 has_matching_pattern(filter.exclude_cmdlines, cmdline.c_str(), binname)) {
339 PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to cmdline",
340 static_cast<int>(pid));
341 return true;
342 }
343 if (filter.exclude_pids.count(pid)) {
344 PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to pid",
345 static_cast<int>(pid));
346 return true;
347 }
348
349 if (have_cmdline &&
350 has_matching_pattern(filter.cmdlines, cmdline.c_str(), binname)) {
351 return false;
352 }
353 if (filter.pids.count(pid)) {
354 return false;
355 }
356
357 // Empty allow filter means keep everything that isn't explicitly excluded.
358 if (filter.cmdlines.empty() && filter.pids.empty() &&
359 !filter.additional_cmdline_count &&
360 !filter.process_sharding.has_value()) {
361 return false;
362 }
363
364 // Niche option: process sharding to amortise systemwide unwinding costs.
365 // Selects a subset of all processes by using the low order bits of their pid.
366 if (filter.process_sharding.has_value()) {
367 uint32_t upid = static_cast<uint32_t>(pid);
368 if (upid % filter.process_sharding->shard_count ==
369 filter.process_sharding->chosen_shard) {
370 PERFETTO_DLOG("Process sharding: keeping pid [%d]",
371 static_cast<int>(pid));
372 return false;
373 } else {
374 PERFETTO_DLOG("Process sharding: rejecting pid [%d]",
375 static_cast<int>(pid));
376 return true;
377 }
378 }
379
380 // Niche option: additionally remember the first seen N process cmdlines, and
381 // keep all processes with those names.
382 if (have_cmdline) {
383 if (additional_cmdlines->count(cmdline)) {
384 return false;
385 }
386 if (additional_cmdlines->size() < filter.additional_cmdline_count) {
387 additional_cmdlines->insert(cmdline);
388 return false;
389 }
390 }
391
392 PERFETTO_DLOG("Rejecting samples for pid [%d]", static_cast<int>(pid));
393 return true;
394 }
395
PerfProducer(ProcDescriptorGetter * proc_fd_getter,base::TaskRunner * task_runner)396 PerfProducer::PerfProducer(ProcDescriptorGetter* proc_fd_getter,
397 base::TaskRunner* task_runner)
398 : task_runner_(task_runner),
399 proc_fd_getter_(proc_fd_getter),
400 unwinding_worker_(this),
401 weak_factory_(this) {
402 proc_fd_getter->SetDelegate(this);
403 }
404
SetupDataSource(DataSourceInstanceID,const DataSourceConfig &)405 void PerfProducer::SetupDataSource(DataSourceInstanceID,
406 const DataSourceConfig&) {}
407
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)408 void PerfProducer::StartDataSource(DataSourceInstanceID ds_id,
409 const DataSourceConfig& config) {
410 uint64_t tracing_session_id = config.tracing_session_id();
411 PERFETTO_LOG("StartDataSource(ds %zu, session %" PRIu64 ", name %s)",
412 static_cast<size_t>(ds_id), tracing_session_id,
413 config.name().c_str());
414
415 if (config.name() == MetatraceWriter::kDataSourceName) {
416 StartMetatraceSource(ds_id, static_cast<BufferID>(config.target_buffer()));
417 return;
418 }
419
420 // linux.perf data source
421 if (config.name() != kDataSourceName)
422 return;
423
424 // Tracepoint name -> id lookup in case the config asks for tracepoints:
425 auto tracepoint_id_lookup = [this](const std::string& group,
426 const std::string& name) {
427 if (!tracefs_) // lazy init or retry
428 tracefs_ = FtraceProcfs::CreateGuessingMountPoint();
429 if (!tracefs_) // still didn't find an accessible tracefs
430 return 0u;
431 return tracefs_->ReadEventId(group, name);
432 };
433
434 protos::gen::PerfEventConfig event_config_pb;
435 if (!event_config_pb.ParseFromString(config.perf_event_config_raw())) {
436 PERFETTO_ELOG("PerfEventConfig could not be parsed.");
437 return;
438 }
439
440 // Unlikely: handle a callstack sampling option that shares a random decision
441 // between all data sources within a tracing session. Instead of introducing
442 // session-scoped data, we replicate the decision in each per-DS EventConfig.
443 std::optional<ProcessSharding> process_sharding;
444 uint32_t shard_count =
445 event_config_pb.callstack_sampling().scope().process_shard_count();
446 if (shard_count > 0) {
447 process_sharding =
448 GetOrChooseCallstackProcessShard(tracing_session_id, shard_count);
449 }
450
451 std::optional<EventConfig> event_config = EventConfig::Create(
452 event_config_pb, config, process_sharding, tracepoint_id_lookup);
453 if (!event_config.has_value()) {
454 PERFETTO_ELOG("PerfEventConfig rejected.");
455 return;
456 }
457
458 std::vector<uint32_t> online_cpus = GetOnlineCpus();
459 if (online_cpus.empty()) {
460 PERFETTO_ELOG("No online CPUs found.");
461 return;
462 }
463
464 std::vector<EventReader> per_cpu_readers;
465 for (uint32_t cpu : online_cpus) {
466 std::optional<EventReader> event_reader =
467 EventReader::ConfigureEvents(cpu, event_config.value());
468 if (!event_reader.has_value()) {
469 PERFETTO_ELOG("Failed to set up perf events for cpu%" PRIu32
470 ", discarding data source.",
471 cpu);
472 return;
473 }
474 per_cpu_readers.emplace_back(std::move(event_reader.value()));
475 }
476
477 auto buffer_id = static_cast<BufferID>(config.target_buffer());
478 auto writer = endpoint_->CreateTraceWriter(buffer_id);
479
480 // Construct the data source instance.
481 std::map<DataSourceInstanceID, DataSourceState>::iterator ds_it;
482 bool inserted;
483 std::tie(ds_it, inserted) = data_sources_.emplace(
484 std::piecewise_construct, std::forward_as_tuple(ds_id),
485 std::forward_as_tuple(event_config.value(), tracing_session_id,
486 std::move(writer), std::move(per_cpu_readers)));
487 PERFETTO_CHECK(inserted);
488 DataSourceState& ds = ds_it->second;
489
490 // Start the configured events.
491 for (auto& per_cpu_reader : ds.per_cpu_readers) {
492 per_cpu_reader.EnableEvents();
493 }
494
495 WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get());
496
497 InterningOutputTracker::WriteFixedInterningsPacket(
498 ds_it->second.trace_writer.get(),
499 protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
500
501 // Inform unwinder of the new data source instance, and optionally start a
502 // periodic task to clear its cached state.
503 auto unwind_mode = (ds.event_config.unwind_mode() ==
504 protos::gen::PerfEventConfig::UNWIND_FRAME_POINTER)
505 ? Unwinder::UnwindMode::kFramePointer
506 : Unwinder::UnwindMode::kUnwindStack;
507 unwinding_worker_->PostStartDataSource(ds_id, ds.event_config.kernel_frames(),
508 unwind_mode);
509 if (ds.event_config.unwind_state_clear_period_ms()) {
510 unwinding_worker_->PostClearCachedStatePeriodic(
511 ds_id, ds.event_config.unwind_state_clear_period_ms());
512 }
513
514 // Kick off periodic read task.
515 auto tick_period_ms = ds.event_config.read_tick_period_ms();
516 auto weak_this = weak_factory_.GetWeakPtr();
517 task_runner_->PostDelayedTask(
518 [weak_this, ds_id] {
519 if (weak_this)
520 weak_this->TickDataSourceRead(ds_id);
521 },
522 TimeToNextReadTickMs(ds_id, tick_period_ms));
523
524 // Optionally kick off periodic memory footprint limit check.
525 uint32_t max_daemon_memory_kb = event_config_pb.max_daemon_memory_kb();
526 if (max_daemon_memory_kb > 0) {
527 task_runner_->PostDelayedTask(
528 [weak_this, ds_id, max_daemon_memory_kb] {
529 if (weak_this)
530 weak_this->CheckMemoryFootprintPeriodic(ds_id,
531 max_daemon_memory_kb);
532 },
533 kMemoryLimitCheckPeriodMs);
534 }
535 }
536
CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,uint32_t max_daemon_memory_kb)537 void PerfProducer::CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,
538 uint32_t max_daemon_memory_kb) {
539 auto ds_it = data_sources_.find(ds_id);
540 if (ds_it == data_sources_.end())
541 return; // stop recurring
542
543 GuardrailConfig gconfig = {};
544 gconfig.memory_guardrail_kb = max_daemon_memory_kb;
545
546 ProfilerMemoryGuardrails footprint_snapshot;
547 if (footprint_snapshot.IsOverMemoryThreshold(gconfig)) {
548 PurgeDataSource(ds_id);
549 return; // stop recurring
550 }
551
552 // repost
553 auto weak_this = weak_factory_.GetWeakPtr();
554 task_runner_->PostDelayedTask(
555 [weak_this, ds_id, max_daemon_memory_kb] {
556 if (weak_this)
557 weak_this->CheckMemoryFootprintPeriodic(ds_id, max_daemon_memory_kb);
558 },
559 kMemoryLimitCheckPeriodMs);
560 }
561
StopDataSource(DataSourceInstanceID ds_id)562 void PerfProducer::StopDataSource(DataSourceInstanceID ds_id) {
563 PERFETTO_LOG("StopDataSource(%zu)", static_cast<size_t>(ds_id));
564
565 // Metatrace: stop immediately (will miss the events from the
566 // asynchronous shutdown of the primary data source).
567 auto meta_it = metatrace_writers_.find(ds_id);
568 if (meta_it != metatrace_writers_.end()) {
569 meta_it->second.WriteAllAndFlushTraceWriter([] {});
570 metatrace_writers_.erase(meta_it);
571 return;
572 }
573
574 auto ds_it = data_sources_.find(ds_id);
575 if (ds_it == data_sources_.end()) {
576 // Most likely, the source is missing due to an abrupt stop (via
577 // |PurgeDataSource|). Tell the service that we've stopped the source now,
578 // so that it doesn't wait for the ack until the timeout.
579 endpoint_->NotifyDataSourceStopped(ds_id);
580 return;
581 }
582
583 // Start shutting down the reading frontend, which will propagate the stop
584 // further as the intermediate buffers are cleared.
585 DataSourceState& ds = ds_it->second;
586 InitiateReaderStop(&ds);
587 }
588
589 // The perf data sources ignore flush requests, as flushing would be
590 // unnecessarily complicated given out-of-order unwinding and proc-fd timeouts.
591 // Instead of responding to explicit flushes, we can ensure that we're otherwise
592 // well-behaved (do not reorder packets too much), and let the service scrape
593 // the SMB.
Flush(FlushRequestID flush_id,const DataSourceInstanceID * data_source_ids,size_t num_data_sources,FlushFlags)594 void PerfProducer::Flush(FlushRequestID flush_id,
595 const DataSourceInstanceID* data_source_ids,
596 size_t num_data_sources,
597 FlushFlags) {
598 // Flush metatracing if requested.
599 for (size_t i = 0; i < num_data_sources; i++) {
600 auto ds_id = data_source_ids[i];
601 PERFETTO_DLOG("Flush(%zu)", static_cast<size_t>(ds_id));
602
603 auto meta_it = metatrace_writers_.find(ds_id);
604 if (meta_it != metatrace_writers_.end()) {
605 meta_it->second.WriteAllAndFlushTraceWriter([] {});
606 }
607 }
608
609 endpoint_->NotifyFlushComplete(flush_id);
610 }
611
ClearIncrementalState(const DataSourceInstanceID * data_source_ids,size_t num_data_sources)612 void PerfProducer::ClearIncrementalState(
613 const DataSourceInstanceID* data_source_ids,
614 size_t num_data_sources) {
615 for (size_t i = 0; i < num_data_sources; i++) {
616 auto ds_id = data_source_ids[i];
617 PERFETTO_DLOG("ClearIncrementalState(%zu)", static_cast<size_t>(ds_id));
618
619 if (metatrace_writers_.find(ds_id) != metatrace_writers_.end())
620 continue;
621
622 auto ds_it = data_sources_.find(ds_id);
623 if (ds_it == data_sources_.end()) {
624 PERFETTO_DLOG("ClearIncrementalState(%zu): did not find matching entry",
625 static_cast<size_t>(ds_id));
626 continue;
627 }
628 DataSourceState& ds = ds_it->second;
629
630 WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get());
631
632 // Forget which incremental state we've emitted before.
633 ds.interning_output.ClearHistory();
634 InterningOutputTracker::WriteFixedInterningsPacket(
635 ds.trace_writer.get(),
636 protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
637
638 // Drop the cross-datasource callstack interning trie. This is not
639 // necessary for correctness (the preceding step is sufficient). However,
640 // incremental clearing is likely to be used in ring buffer traces, where
641 // it makes sense to reset the trie's size periodically, and this is a
642 // reasonable point to do so. The trie keeps the monotonic interning IDs,
643 // so there is no confusion for other concurrent data sources. We do not
644 // bother with clearing concurrent sources' interning output trackers as
645 // their footprint should be trivial.
646 callstack_trie_.ClearTrie();
647 }
648 }
649
TickDataSourceRead(DataSourceInstanceID ds_id)650 void PerfProducer::TickDataSourceRead(DataSourceInstanceID ds_id) {
651 auto it = data_sources_.find(ds_id);
652 if (it == data_sources_.end()) {
653 PERFETTO_DLOG("TickDataSourceRead(%zu): source gone",
654 static_cast<size_t>(ds_id));
655 return;
656 }
657 DataSourceState& ds = it->second;
658
659 PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_TICK);
660
661 // Make a pass over all per-cpu readers.
662 uint64_t max_samples = ds.event_config.samples_per_tick_limit();
663 bool more_records_available = false;
664 for (EventReader& reader : ds.per_cpu_readers) {
665 if (ReadAndParsePerCpuBuffer(&reader, max_samples, ds_id, &ds)) {
666 more_records_available = true;
667 }
668 }
669
670 // Wake up the unwinder as we've (likely) pushed samples into its queue.
671 unwinding_worker_->PostProcessQueue();
672
673 if (PERFETTO_UNLIKELY(ds.status == DataSourceState::Status::kShuttingDown) &&
674 !more_records_available) {
675 unwinding_worker_->PostInitiateDataSourceStop(ds_id);
676 } else {
677 // otherwise, keep reading
678 auto tick_period_ms = it->second.event_config.read_tick_period_ms();
679 auto weak_this = weak_factory_.GetWeakPtr();
680 task_runner_->PostDelayedTask(
681 [weak_this, ds_id] {
682 if (weak_this)
683 weak_this->TickDataSourceRead(ds_id);
684 },
685 TimeToNextReadTickMs(ds_id, tick_period_ms));
686 }
687 }
688
ReadAndParsePerCpuBuffer(EventReader * reader,uint64_t max_samples,DataSourceInstanceID ds_id,DataSourceState * ds)689 bool PerfProducer::ReadAndParsePerCpuBuffer(EventReader* reader,
690 uint64_t max_samples,
691 DataSourceInstanceID ds_id,
692 DataSourceState* ds) {
693 PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_CPU);
694
695 // If the kernel ring buffer dropped data, record it in the trace.
696 size_t cpu = reader->cpu();
697 auto records_lost_callback = [this, ds_id, cpu](uint64_t records_lost) {
698 auto weak_this = weak_factory_.GetWeakPtr();
699 task_runner_->PostTask([weak_this, ds_id, cpu, records_lost] {
700 if (weak_this)
701 weak_this->EmitRingBufferLoss(ds_id, cpu, records_lost);
702 });
703 };
704
705 for (uint64_t i = 0; i < max_samples; i++) {
706 std::optional<ParsedSample> sample =
707 reader->ReadUntilSample(records_lost_callback);
708 if (!sample) {
709 return false; // caught up to the writer
710 }
711
712 // Counter-only mode: skip the unwinding stage, serialise the sample
713 // immediately.
714 const EventConfig& event_config = ds->event_config;
715 if (!event_config.sample_callstacks()) {
716 CompletedSample output;
717 output.common = sample->common;
718 EmitSample(ds_id, std::move(output));
719 continue;
720 }
721
722 // Sampling either or both of userspace and kernel callstacks.
723 pid_t pid = sample->common.pid;
724 auto& process_state = ds->process_states[pid]; // insert if new
725
726 // Asynchronous proc-fd lookup timed out.
727 if (process_state == ProcessTrackingStatus::kFdsTimedOut) {
728 PERFETTO_DLOG("Skipping sample for pid [%d]: kFdsTimedOut",
729 static_cast<int>(pid));
730 EmitSkippedSample(ds_id, std::move(sample.value()),
731 SampleSkipReason::kReadStage);
732 continue;
733 }
734
735 // Previously excluded, e.g. due to failing the target filter check.
736 if (process_state == ProcessTrackingStatus::kRejected) {
737 PERFETTO_DLOG("Skipping sample for pid [%d]: kRejected",
738 static_cast<int>(pid));
739 continue;
740 }
741
742 // Seeing pid for the first time. We need to consider whether the process
743 // is a kernel thread, and which callstacks we're recording.
744 //
745 // {user} stacks -> user processes: signal for proc-fd lookup
746 // -> kthreads: reject
747 //
748 // {kernel} stacks -> user processes: accept without proc-fds
749 // -> kthreads: accept without proc-fds
750 //
751 // {kernel+user} stacks -> user processes: signal for proc-fd lookup
752 // -> kthreads: accept without proc-fds
753 //
754 if (process_state == ProcessTrackingStatus::kInitial) {
755 PERFETTO_DLOG("New pid: [%d]", static_cast<int>(pid));
756
757 // Kernel threads (which have no userspace state) are never relevant if
758 // we're not recording kernel callchains.
759 bool is_kthread = !sample->regs; // no userspace regs
760 if (is_kthread && !event_config.kernel_frames()) {
761 process_state = ProcessTrackingStatus::kRejected;
762 continue;
763 }
764
765 // Check whether samples for this new process should be dropped due to
766 // the target filtering. Kernel threads don't have a cmdline, but we
767 // still check against pid inclusion/exclusion.
768 if (ShouldRejectDueToFilter(
769 pid, event_config.filter(), is_kthread, &ds->additional_cmdlines,
770 [pid](std::string* cmdline) {
771 return glob_aware::ReadProcCmdlineForPID(pid, cmdline);
772 })) {
773 process_state = ProcessTrackingStatus::kRejected;
774 continue;
775 }
776
777 // At this point, sampled process is known to be of interest.
778 if (!is_kthread && event_config.user_frames()) {
779 // Start resolving the proc-fds. Response is async.
780 process_state = ProcessTrackingStatus::kFdsResolving;
781 InitiateDescriptorLookup(ds_id, pid,
782 event_config.remote_descriptor_timeout_ms());
783 // note: fallthrough
784 } else {
785 // Either a kernel thread (no need to obtain proc-fds), or a userspace
786 // process but we're not recording userspace callstacks.
787 process_state = ProcessTrackingStatus::kAccepted;
788 unwinding_worker_->PostRecordNoUserspaceProcess(ds_id, pid);
789 // note: fallthrough
790 }
791 }
792
793 PERFETTO_CHECK(process_state == ProcessTrackingStatus::kAccepted ||
794 process_state == ProcessTrackingStatus::kFdsResolving);
795
796 // If we're only interested in the kernel callchains, then userspace
797 // process samples are relevant only if they were sampled during kernel
798 // context.
799 if (!event_config.user_frames() &&
800 sample->common.cpu_mode == PERF_RECORD_MISC_USER) {
801 PERFETTO_DLOG("Skipping usermode sample for kernel-only config");
802 continue;
803 }
804
805 // Optionally: drop sample if above a given threshold of sampled stacks
806 // that are waiting in the unwinding queue.
807 uint64_t max_footprint_bytes = event_config.max_enqueued_footprint_bytes();
808 uint64_t sample_stack_size = sample->stack.size();
809 if (max_footprint_bytes) {
810 uint64_t footprint_bytes = unwinding_worker_->GetEnqueuedFootprint();
811 if (footprint_bytes + sample_stack_size >= max_footprint_bytes) {
812 PERFETTO_DLOG("Skipping sample enqueueing due to footprint limit.");
813 EmitSkippedSample(ds_id, std::move(sample.value()),
814 SampleSkipReason::kUnwindEnqueue);
815 continue;
816 }
817 }
818
819 // Push the sample into the unwinding queue if there is room.
820 auto& queue = unwinding_worker_->unwind_queue();
821 WriteView write_view = queue.BeginWrite();
822 if (write_view.valid) {
823 queue.at(write_view.write_pos) =
824 UnwindEntry{ds_id, std::move(sample.value())};
825 queue.CommitWrite();
826 unwinding_worker_->IncrementEnqueuedFootprint(sample_stack_size);
827 } else {
828 PERFETTO_DLOG("Unwinder queue full, skipping sample");
829 EmitSkippedSample(ds_id, std::move(sample.value()),
830 SampleSkipReason::kUnwindEnqueue);
831 }
832 } // for (i < max_samples)
833
834 // Most likely more events in the kernel buffer. Though we might be exactly on
835 // the boundary due to |max_samples|.
836 return true;
837 }
838
839 // Note: first-fit makes descriptor request fulfillment not true FIFO. But the
840 // edge-cases where it matters are very unlikely.
OnProcDescriptors(pid_t pid,uid_t uid,base::ScopedFile maps_fd,base::ScopedFile mem_fd)841 void PerfProducer::OnProcDescriptors(pid_t pid,
842 uid_t uid,
843 base::ScopedFile maps_fd,
844 base::ScopedFile mem_fd) {
845 // Find first-fit data source that requested descriptors for the process.
846 for (auto& it : data_sources_) {
847 DataSourceState& ds = it.second;
848 auto proc_status_it = ds.process_states.find(pid);
849 if (proc_status_it == ds.process_states.end())
850 continue;
851
852 // TODO(rsavitski): consider checking ProcessTrackingStatus before
853 // CanProfile.
854 if (!CanProfile(ds.event_config.raw_ds_config(), uid,
855 ds.event_config.target_installed_by())) {
856 PERFETTO_DLOG("Not profileable: pid [%d], uid [%d] for DS [%zu]",
857 static_cast<int>(pid), static_cast<int>(uid),
858 static_cast<size_t>(it.first));
859 continue;
860 }
861
862 // Match against either resolving, or expired state. In the latter
863 // case, it means that the async response was slow enough that we've marked
864 // the lookup as expired (but can now recover for future samples).
865 auto proc_status = proc_status_it->second;
866 if (proc_status == ProcessTrackingStatus::kFdsResolving ||
867 proc_status == ProcessTrackingStatus::kFdsTimedOut) {
868 PERFETTO_DLOG("Handing off proc-fds for pid [%d] to DS [%zu]",
869 static_cast<int>(pid), static_cast<size_t>(it.first));
870
871 proc_status_it->second = ProcessTrackingStatus::kAccepted;
872 unwinding_worker_->PostAdoptProcDescriptors(
873 it.first, pid, std::move(maps_fd), std::move(mem_fd));
874 return; // done
875 }
876 }
877 PERFETTO_DLOG(
878 "Discarding proc-fds for pid [%d] as found no outstanding requests.",
879 static_cast<int>(pid));
880 }
881
InitiateDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)882 void PerfProducer::InitiateDescriptorLookup(DataSourceInstanceID ds_id,
883 pid_t pid,
884 uint32_t timeout_ms) {
885 if (!proc_fd_getter_->RequiresDelayedRequest()) {
886 StartDescriptorLookup(ds_id, pid, timeout_ms);
887 return;
888 }
889
890 // Delay lookups on Android. See comment on |kProcDescriptorsAndroidDelayMs|.
891 auto weak_this = weak_factory_.GetWeakPtr();
892 task_runner_->PostDelayedTask(
893 [weak_this, ds_id, pid, timeout_ms] {
894 if (weak_this)
895 weak_this->StartDescriptorLookup(ds_id, pid, timeout_ms);
896 },
897 kProcDescriptorsAndroidDelayMs);
898 }
899
StartDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)900 void PerfProducer::StartDescriptorLookup(DataSourceInstanceID ds_id,
901 pid_t pid,
902 uint32_t timeout_ms) {
903 proc_fd_getter_->GetDescriptorsForPid(pid);
904
905 auto weak_this = weak_factory_.GetWeakPtr();
906 task_runner_->PostDelayedTask(
907 [weak_this, ds_id, pid] {
908 if (weak_this)
909 weak_this->EvaluateDescriptorLookupTimeout(ds_id, pid);
910 },
911 timeout_ms);
912 }
913
EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,pid_t pid)914 void PerfProducer::EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,
915 pid_t pid) {
916 auto ds_it = data_sources_.find(ds_id);
917 if (ds_it == data_sources_.end())
918 return;
919
920 DataSourceState& ds = ds_it->second;
921 auto proc_status_it = ds.process_states.find(pid);
922 if (proc_status_it == ds.process_states.end())
923 return;
924
925 // If the request is still outstanding, mark the process as expired (causing
926 // outstanding and future samples to be discarded).
927 auto proc_status = proc_status_it->second;
928 if (proc_status == ProcessTrackingStatus::kFdsResolving) {
929 PERFETTO_DLOG("Descriptor lookup timeout of pid [%d] for DS [%zu]",
930 static_cast<int>(pid), static_cast<size_t>(ds_it->first));
931
932 proc_status_it->second = ProcessTrackingStatus::kFdsTimedOut;
933 // Also inform the unwinder of the state change (so that it can discard any
934 // of the already-enqueued samples).
935 unwinding_worker_->PostRecordTimedOutProcDescriptors(ds_id, pid);
936 }
937 }
938
PostEmitSample(DataSourceInstanceID ds_id,CompletedSample sample)939 void PerfProducer::PostEmitSample(DataSourceInstanceID ds_id,
940 CompletedSample sample) {
941 // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
942 CompletedSample* raw_sample = new CompletedSample(std::move(sample));
943 auto weak_this = weak_factory_.GetWeakPtr();
944 task_runner_->PostTask([weak_this, ds_id, raw_sample] {
945 if (weak_this)
946 weak_this->EmitSample(ds_id, std::move(*raw_sample));
947 delete raw_sample;
948 });
949 }
950
EmitSample(DataSourceInstanceID ds_id,CompletedSample sample)951 void PerfProducer::EmitSample(DataSourceInstanceID ds_id,
952 CompletedSample sample) {
953 auto ds_it = data_sources_.find(ds_id);
954 if (ds_it == data_sources_.end()) {
955 PERFETTO_DLOG("EmitSample(ds: %zu): source gone",
956 static_cast<size_t>(ds_id));
957 return;
958 }
959 DataSourceState& ds = ds_it->second;
960
961 // intern callsite
962 GlobalCallstackTrie::Node* callstack_root =
963 callstack_trie_.CreateCallsite(sample.frames, sample.build_ids);
964 uint64_t callstack_iid = callstack_root->id();
965
966 // start packet, timestamp domain defaults to monotonic_raw
967 auto packet = StartTracePacket(ds.trace_writer.get());
968 packet->set_timestamp(sample.common.timestamp);
969
970 // write new interning data (if any)
971 protos::pbzero::InternedData* interned_out = packet->set_interned_data();
972 ds.interning_output.WriteCallstack(callstack_root, &callstack_trie_,
973 interned_out);
974
975 // write the sample itself
976 auto* perf_sample = packet->set_perf_sample();
977 perf_sample->set_cpu(sample.common.cpu);
978 perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid));
979 perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid));
980 perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode));
981 perf_sample->set_timebase_count(sample.common.timebase_count);
982
983 for (size_t i = 0; i < sample.common.follower_counts.size(); ++i) {
984 perf_sample->add_follower_counts(sample.common.follower_counts[i]);
985 }
986
987 perf_sample->set_callstack_iid(callstack_iid);
988 if (sample.unwind_error != unwindstack::ERROR_NONE) {
989 perf_sample->set_unwind_error(ToProtoEnum(sample.unwind_error));
990 }
991 }
992
EmitRingBufferLoss(DataSourceInstanceID ds_id,size_t cpu,uint64_t records_lost)993 void PerfProducer::EmitRingBufferLoss(DataSourceInstanceID ds_id,
994 size_t cpu,
995 uint64_t records_lost) {
996 auto ds_it = data_sources_.find(ds_id);
997 if (ds_it == data_sources_.end())
998 return;
999 DataSourceState& ds = ds_it->second;
1000 PERFETTO_DLOG("DataSource(%zu): cpu%zu lost [%" PRIu64 "] records",
1001 static_cast<size_t>(ds_id), cpu, records_lost);
1002
1003 // The data loss record relates to a single ring buffer, and indicates loss
1004 // since the last successfully-written record in that buffer. Therefore the
1005 // data loss record itself has no timestamp.
1006 // We timestamp the packet with the boot clock for packet ordering purposes,
1007 // but it no longer has a (precise) interpretation relative to the sample
1008 // stream from that per-cpu buffer. See the proto comments for more details.
1009 auto packet = StartTracePacket(ds.trace_writer.get());
1010 packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
1011 packet->set_timestamp_clock_id(
1012 protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
1013
1014 auto* perf_sample = packet->set_perf_sample();
1015 perf_sample->set_cpu(static_cast<uint32_t>(cpu));
1016 perf_sample->set_kernel_records_lost(records_lost);
1017 }
1018
PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample)1019 void PerfProducer::PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,
1020 ParsedSample sample) {
1021 PostEmitSkippedSample(ds_id, std::move(sample),
1022 SampleSkipReason::kUnwindStage);
1023 }
1024
PostEmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)1025 void PerfProducer::PostEmitSkippedSample(DataSourceInstanceID ds_id,
1026 ParsedSample sample,
1027 SampleSkipReason reason) {
1028 // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
1029 ParsedSample* raw_sample = new ParsedSample(std::move(sample));
1030 auto weak_this = weak_factory_.GetWeakPtr();
1031 task_runner_->PostTask([weak_this, ds_id, raw_sample, reason] {
1032 if (weak_this)
1033 weak_this->EmitSkippedSample(ds_id, std::move(*raw_sample), reason);
1034 delete raw_sample;
1035 });
1036 }
1037
EmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)1038 void PerfProducer::EmitSkippedSample(DataSourceInstanceID ds_id,
1039 ParsedSample sample,
1040 SampleSkipReason reason) {
1041 auto ds_it = data_sources_.find(ds_id);
1042 if (ds_it == data_sources_.end())
1043 return;
1044 DataSourceState& ds = ds_it->second;
1045
1046 // Note: timestamp defaults to the monotonic_raw domain.
1047 auto packet = StartTracePacket(ds.trace_writer.get());
1048 packet->set_timestamp(sample.common.timestamp);
1049 auto* perf_sample = packet->set_perf_sample();
1050 perf_sample->set_cpu(sample.common.cpu);
1051 perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid));
1052 perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid));
1053 perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode));
1054 perf_sample->set_timebase_count(sample.common.timebase_count);
1055
1056 for (size_t i = 0; i < sample.common.follower_counts.size(); ++i) {
1057 perf_sample->add_follower_counts(sample.common.follower_counts[i]);
1058 }
1059
1060 using PerfSample = protos::pbzero::PerfSample;
1061 switch (reason) {
1062 case SampleSkipReason::kReadStage:
1063 perf_sample->set_sample_skipped_reason(
1064 PerfSample::PROFILER_SKIP_READ_STAGE);
1065 break;
1066 case SampleSkipReason::kUnwindEnqueue:
1067 perf_sample->set_sample_skipped_reason(
1068 PerfSample::PROFILER_SKIP_UNWIND_ENQUEUE);
1069 break;
1070 case SampleSkipReason::kUnwindStage:
1071 perf_sample->set_sample_skipped_reason(
1072 PerfSample::PROFILER_SKIP_UNWIND_STAGE);
1073 break;
1074 }
1075 }
1076
InitiateReaderStop(DataSourceState * ds)1077 void PerfProducer::InitiateReaderStop(DataSourceState* ds) {
1078 PERFETTO_DLOG("InitiateReaderStop");
1079 PERFETTO_CHECK(ds->status != DataSourceState::Status::kShuttingDown);
1080
1081 ds->status = DataSourceState::Status::kShuttingDown;
1082 for (auto& event_reader : ds->per_cpu_readers) {
1083 event_reader.DisableEvents();
1084 }
1085 }
1086
PostFinishDataSourceStop(DataSourceInstanceID ds_id)1087 void PerfProducer::PostFinishDataSourceStop(DataSourceInstanceID ds_id) {
1088 auto weak_producer = weak_factory_.GetWeakPtr();
1089 task_runner_->PostTask([weak_producer, ds_id] {
1090 if (weak_producer)
1091 weak_producer->FinishDataSourceStop(ds_id);
1092 });
1093 }
1094
FinishDataSourceStop(DataSourceInstanceID ds_id)1095 void PerfProducer::FinishDataSourceStop(DataSourceInstanceID ds_id) {
1096 PERFETTO_LOG("FinishDataSourceStop(%zu)", static_cast<size_t>(ds_id));
1097 auto ds_it = data_sources_.find(ds_id);
1098 if (ds_it == data_sources_.end()) {
1099 PERFETTO_DLOG("FinishDataSourceStop(%zu): source gone",
1100 static_cast<size_t>(ds_id));
1101 return;
1102 }
1103 DataSourceState& ds = ds_it->second;
1104 PERFETTO_CHECK(ds.status == DataSourceState::Status::kShuttingDown);
1105
1106 ds.trace_writer->Flush();
1107 data_sources_.erase(ds_it);
1108
1109 endpoint_->NotifyDataSourceStopped(ds_id);
1110
1111 // Clean up resources if there are no more active sources.
1112 if (data_sources_.empty()) {
1113 callstack_trie_.ClearTrie(); // purge internings
1114 base::MaybeReleaseAllocatorMemToOS();
1115 }
1116 }
1117
1118 // TODO(rsavitski): maybe make the tracing service respect premature
1119 // producer-driven stops, and then issue a NotifyDataSourceStopped here.
1120 // Alternatively (and at the expense of higher complexity) introduce a new data
1121 // source status of "tombstoned", and propagate it until the source is stopped
1122 // by the service (this would technically allow for stricter lifetime checking
1123 // of data sources, and help with discarding periodic flushes).
1124 // TODO(rsavitski): Purging while stopping will currently leave the stop
1125 // unacknowledged. Consider checking whether the DS is stopping here, and if so,
1126 // notifying immediately after erasing.
PurgeDataSource(DataSourceInstanceID ds_id)1127 void PerfProducer::PurgeDataSource(DataSourceInstanceID ds_id) {
1128 auto ds_it = data_sources_.find(ds_id);
1129 if (ds_it == data_sources_.end())
1130 return;
1131 DataSourceState& ds = ds_it->second;
1132
1133 PERFETTO_LOG("Stopping DataSource(%zu) prematurely",
1134 static_cast<size_t>(ds_id));
1135
1136 unwinding_worker_->PostPurgeDataSource(ds_id);
1137
1138 // Write a packet indicating the abrupt stop.
1139 {
1140 auto packet = StartTracePacket(ds.trace_writer.get());
1141 packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
1142 packet->set_timestamp_clock_id(
1143 protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
1144 auto* perf_sample = packet->set_perf_sample();
1145 auto* producer_event = perf_sample->set_producer_event();
1146 producer_event->set_source_stop_reason(
1147 protos::pbzero::PerfSample::ProducerEvent::PROFILER_STOP_GUARDRAIL);
1148 }
1149
1150 ds.trace_writer->Flush();
1151 data_sources_.erase(ds_it);
1152
1153 // Clean up resources if there are no more active sources.
1154 if (data_sources_.empty()) {
1155 callstack_trie_.ClearTrie(); // purge internings
1156 base::MaybeReleaseAllocatorMemToOS();
1157 }
1158 }
1159
1160 // Either:
1161 // * choose a random number up to |shard_count|.
1162 // * reuse a choice made previously by a data source within this tracing
1163 // session. The config option requires that all data sources within one config
1164 // have the same shard count.
GetOrChooseCallstackProcessShard(uint64_t tracing_session_id,uint32_t shard_count)1165 std::optional<ProcessSharding> PerfProducer::GetOrChooseCallstackProcessShard(
1166 uint64_t tracing_session_id,
1167 uint32_t shard_count) {
1168 for (auto& it : data_sources_) {
1169 const DataSourceState& ds = it.second;
1170 const auto& sharding = ds.event_config.filter().process_sharding;
1171 if ((ds.tracing_session_id != tracing_session_id) || !sharding.has_value())
1172 continue;
1173
1174 // Found existing data source, reuse its decision while doing best-effort
1175 // error reporting (logging) if the shard count is not the same.
1176 if (sharding->shard_count != shard_count) {
1177 PERFETTO_ELOG(
1178 "Mismatch of process_shard_count between data sources in tracing "
1179 "session %" PRIu64 ". Overriding shard count to match.",
1180 tracing_session_id);
1181 }
1182 return sharding;
1183 }
1184
1185 // First data source in this session, choose random shard.
1186 std::random_device r;
1187 std::minstd_rand minstd(r());
1188 std::uniform_int_distribution<uint32_t> dist(0, shard_count - 1);
1189 uint32_t chosen_shard = dist(minstd);
1190
1191 ProcessSharding ret;
1192 ret.shard_count = shard_count;
1193 ret.chosen_shard = chosen_shard;
1194
1195 PERFETTO_DCHECK(ret.shard_count && ret.chosen_shard < ret.shard_count);
1196 return ret;
1197 }
1198
StartMetatraceSource(DataSourceInstanceID ds_id,BufferID target_buffer)1199 void PerfProducer::StartMetatraceSource(DataSourceInstanceID ds_id,
1200 BufferID target_buffer) {
1201 auto writer = endpoint_->CreateTraceWriter(target_buffer);
1202
1203 auto it_and_inserted = metatrace_writers_.emplace(
1204 std::piecewise_construct, std::make_tuple(ds_id), std::make_tuple());
1205 PERFETTO_DCHECK(it_and_inserted.second);
1206 // Note: only the first concurrent writer will actually be active.
1207 metatrace_writers_[ds_id].Enable(task_runner_, std::move(writer),
1208 metatrace::TAG_ANY);
1209 }
1210
ConnectWithRetries(const char * socket_name)1211 void PerfProducer::ConnectWithRetries(const char* socket_name) {
1212 PERFETTO_DCHECK(state_ == kNotStarted);
1213 state_ = kNotConnected;
1214
1215 ResetConnectionBackoff();
1216 producer_socket_name_ = socket_name;
1217 ConnectService();
1218 }
1219
ConnectService()1220 void PerfProducer::ConnectService() {
1221 PERFETTO_DCHECK(state_ == kNotConnected);
1222 state_ = kConnecting;
1223 endpoint_ = ProducerIPCClient::Connect(
1224 producer_socket_name_, this, kProducerName, task_runner_,
1225 TracingService::ProducerSMBScrapingMode::kEnabled);
1226 }
1227
IncreaseConnectionBackoff()1228 void PerfProducer::IncreaseConnectionBackoff() {
1229 connection_backoff_ms_ *= 2;
1230 if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
1231 connection_backoff_ms_ = kMaxConnectionBackoffMs;
1232 }
1233
ResetConnectionBackoff()1234 void PerfProducer::ResetConnectionBackoff() {
1235 connection_backoff_ms_ = kInitialConnectionBackoffMs;
1236 }
1237
OnConnect()1238 void PerfProducer::OnConnect() {
1239 PERFETTO_DCHECK(state_ == kConnecting);
1240 state_ = kConnected;
1241 ResetConnectionBackoff();
1242 PERFETTO_LOG("Connected to the service");
1243
1244 {
1245 // linux.perf
1246 DataSourceDescriptor desc;
1247 desc.set_name(kDataSourceName);
1248 desc.set_handles_incremental_state_clear(true);
1249 desc.set_will_notify_on_stop(true);
1250 endpoint_->RegisterDataSource(desc);
1251 }
1252 {
1253 // metatrace
1254 DataSourceDescriptor desc;
1255 desc.set_name(MetatraceWriter::kDataSourceName);
1256 endpoint_->RegisterDataSource(desc);
1257 }
1258 // Used by tracebox to synchronize with traced_probes being registered.
1259 if (all_data_sources_registered_cb_) {
1260 endpoint_->Sync(all_data_sources_registered_cb_);
1261 }
1262 }
1263
OnDisconnect()1264 void PerfProducer::OnDisconnect() {
1265 PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
1266 PERFETTO_LOG("Disconnected from tracing service");
1267
1268 auto weak_producer = weak_factory_.GetWeakPtr();
1269 if (state_ == kConnected)
1270 return task_runner_->PostTask([weak_producer] {
1271 if (weak_producer)
1272 weak_producer->Restart();
1273 });
1274
1275 state_ = kNotConnected;
1276 IncreaseConnectionBackoff();
1277 task_runner_->PostDelayedTask(
1278 [weak_producer] {
1279 if (weak_producer)
1280 weak_producer->ConnectService();
1281 },
1282 connection_backoff_ms_);
1283 }
1284
Restart()1285 void PerfProducer::Restart() {
1286 // We lost the connection with the tracing service. At this point we need
1287 // to reset all the data sources. Trying to handle that manually is going to
1288 // be error prone. What we do here is simply destroy the instance and
1289 // recreate it again.
1290 base::TaskRunner* task_runner = task_runner_;
1291 const char* socket_name = producer_socket_name_;
1292 ProcDescriptorGetter* proc_fd_getter = proc_fd_getter_;
1293
1294 // Invoke destructor and then the constructor again.
1295 this->~PerfProducer();
1296 new (this) PerfProducer(proc_fd_getter, task_runner);
1297
1298 ConnectWithRetries(socket_name);
1299 }
1300
1301 } // namespace profiling
1302 } // namespace perfetto
1303