xref: /aosp_15_r20/external/perfetto/src/tracing/service/tracing_service_impl.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/service/tracing_service_impl.h"
18 
19 #include <limits.h>
20 #include <string.h>
21 
22 #include <algorithm>
23 #include <cinttypes>
24 #include <cstdint>
25 #include <limits>
26 #include <optional>
27 #include <string>
28 #include <unordered_set>
29 
30 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
31     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
32 #include <sys/uio.h>
33 #include <sys/utsname.h>
34 #include <unistd.h>
35 #endif
36 
37 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
38     PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
39 #include "src/android_internal/lazy_library_loader.h"    // nogncheck
40 #include "src/android_internal/tracing_service_proxy.h"  // nogncheck
41 #endif
42 
43 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) || \
44     PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) ||   \
45     PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
46 #define PERFETTO_HAS_CHMOD
47 #include <sys/stat.h>
48 #endif
49 
50 #include "perfetto/base/build_config.h"
51 #include "perfetto/base/status.h"
52 #include "perfetto/base/task_runner.h"
53 #include "perfetto/ext/base/android_utils.h"
54 #include "perfetto/ext/base/clock_snapshots.h"
55 #include "perfetto/ext/base/file_utils.h"
56 #include "perfetto/ext/base/metatrace.h"
57 #include "perfetto/ext/base/string_utils.h"
58 #include "perfetto/ext/base/string_view.h"
59 #include "perfetto/ext/base/sys_types.h"
60 #include "perfetto/ext/base/utils.h"
61 #include "perfetto/ext/base/uuid.h"
62 #include "perfetto/ext/base/version.h"
63 #include "perfetto/ext/base/watchdog.h"
64 #include "perfetto/ext/tracing/core/basic_types.h"
65 #include "perfetto/ext/tracing/core/client_identity.h"
66 #include "perfetto/ext/tracing/core/consumer.h"
67 #include "perfetto/ext/tracing/core/observable_events.h"
68 #include "perfetto/ext/tracing/core/producer.h"
69 #include "perfetto/ext/tracing/core/shared_memory.h"
70 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
71 #include "perfetto/ext/tracing/core/trace_packet.h"
72 #include "perfetto/ext/tracing/core/trace_writer.h"
73 #include "perfetto/protozero/scattered_heap_buffer.h"
74 #include "perfetto/protozero/static_buffer.h"
75 #include "perfetto/tracing/core/data_source_descriptor.h"
76 #include "perfetto/tracing/core/tracing_service_capabilities.h"
77 #include "perfetto/tracing/core/tracing_service_state.h"
78 #include "src/android_stats/statsd_logging_helper.h"
79 #include "src/protozero/filtering/message_filter.h"
80 #include "src/protozero/filtering/string_filter.h"
81 #include "src/tracing/core/shared_memory_arbiter_impl.h"
82 #include "src/tracing/service/packet_stream_validator.h"
83 #include "src/tracing/service/trace_buffer.h"
84 
85 #include "protos/perfetto/common/builtin_clock.gen.h"
86 #include "protos/perfetto/common/builtin_clock.pbzero.h"
87 #include "protos/perfetto/common/trace_stats.pbzero.h"
88 #include "protos/perfetto/config/trace_config.pbzero.h"
89 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
90 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
91 #include "protos/perfetto/trace/remote_clock_sync.pbzero.h"
92 #include "protos/perfetto/trace/system_info.pbzero.h"
93 #include "protos/perfetto/trace/trace_packet.pbzero.h"
94 #include "protos/perfetto/trace/trace_uuid.pbzero.h"
95 #include "protos/perfetto/trace/trigger.pbzero.h"
96 
97 // General note: this class must assume that Producers are malicious and will
98 // try to crash / exploit this class. We can trust pointers because they come
99 // from the IPC layer, but we should never assume that that the producer calls
100 // come in the right order or their arguments are sane / within bounds.
101 
102 // This is a macro because we want the call-site line number for the ELOG.
103 #define PERFETTO_SVC_ERR(...) \
104   (PERFETTO_ELOG(__VA_ARGS__), ::perfetto::base::ErrStatus(__VA_ARGS__))
105 
106 namespace perfetto {
107 
108 namespace {
109 constexpr int kMaxBuffersPerConsumer = 128;
110 constexpr uint32_t kDefaultSnapshotsIntervalMs = 10 * 1000;
111 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
112 constexpr int kMinWriteIntoFilePeriodMs = 100;
113 constexpr uint32_t kAllDataSourceStartedTimeout = 20000;
114 constexpr int kMaxConcurrentTracingSessions = 15;
115 constexpr int kMaxConcurrentTracingSessionsPerUid = 5;
116 constexpr int kMaxConcurrentTracingSessionsForStatsdUid = 10;
117 constexpr int64_t kMinSecondsBetweenTracesGuardrail = 5 * 60;
118 
119 constexpr uint32_t kMillisPerHour = 3600000;
120 constexpr uint32_t kMillisPerDay = kMillisPerHour * 24;
121 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
122 
123 // These apply only if enable_extra_guardrails is true.
124 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 128 * 1024;
125 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
126 
127 constexpr size_t kMaxLifecycleEventsListedDataSources = 32;
128 
129 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) || PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
130 struct iovec {
131   void* iov_base;  // Address
132   size_t iov_len;  // Block size
133 };
134 
135 // Simple implementation of writev. Note that this does not give the atomicity
136 // guarantees of a real writev, but we don't depend on these (we aren't writing
137 // to the same file from another thread).
writev(int fd,const struct iovec * iov,int iovcnt)138 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
139   ssize_t total_size = 0;
140   for (int i = 0; i < iovcnt; ++i) {
141     ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
142     if (current_size != static_cast<ssize_t>(iov[i].iov_len))
143       return -1;
144     total_size += current_size;
145   }
146   return total_size;
147 }
148 
149 #define IOV_MAX 1024  // Linux compatible limit.
150 
151 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) ||
152         // PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
153 
154 // Partially encodes a CommitDataRequest in an int32 for the purposes of
155 // metatracing. Note that it encodes only the bottom 10 bits of the producer id
156 // (which is technically 16 bits wide).
157 //
158 // Format (by bit range):
159 // [   31 ][         30 ][             29:20 ][            19:10 ][        9:0]
160 // [unused][has flush id][num chunks to patch][num chunks to move][producer id]
EncodeCommitDataRequest(ProducerID producer_id,const CommitDataRequest & req_untrusted)161 int32_t EncodeCommitDataRequest(ProducerID producer_id,
162                                 const CommitDataRequest& req_untrusted) {
163   uint32_t cmov = static_cast<uint32_t>(req_untrusted.chunks_to_move_size());
164   uint32_t cpatch = static_cast<uint32_t>(req_untrusted.chunks_to_patch_size());
165   uint32_t has_flush_id = req_untrusted.flush_request_id() != 0;
166 
167   uint32_t mask = (1 << 10) - 1;
168   uint32_t acc = 0;
169   acc |= has_flush_id << 30;
170   acc |= (cpatch & mask) << 20;
171   acc |= (cmov & mask) << 10;
172   acc |= (producer_id & mask);
173   return static_cast<int32_t>(acc);
174 }
175 
SerializeAndAppendPacket(std::vector<TracePacket> * packets,std::vector<uint8_t> packet)176 void SerializeAndAppendPacket(std::vector<TracePacket>* packets,
177                               std::vector<uint8_t> packet) {
178   Slice slice = Slice::Allocate(packet.size());
179   memcpy(slice.own_data(), packet.data(), packet.size());
180   packets->emplace_back();
181   packets->back().AddSlice(std::move(slice));
182 }
183 
EnsureValidShmSizes(size_t shm_size,size_t page_size)184 std::tuple<size_t /*shm_size*/, size_t /*page_size*/> EnsureValidShmSizes(
185     size_t shm_size,
186     size_t page_size) {
187   // Theoretically the max page size supported by the ABI is 64KB.
188   // However, the current implementation of TraceBuffer (the non-shared
189   // userspace buffer where the service copies data) supports at most
190   // 32K. Setting 64K "works" from the producer<>consumer viewpoint
191   // but then causes the data to be discarded when copying it into
192   // TraceBuffer.
193   constexpr size_t kMaxPageSize = 32 * 1024;
194   static_assert(kMaxPageSize <= SharedMemoryABI::kMaxPageSize, "");
195 
196   if (page_size == 0)
197     page_size = TracingServiceImpl::kDefaultShmPageSize;
198   if (shm_size == 0)
199     shm_size = TracingServiceImpl::kDefaultShmSize;
200 
201   page_size = std::min<size_t>(page_size, kMaxPageSize);
202   shm_size = std::min<size_t>(shm_size, TracingServiceImpl::kMaxShmSize);
203 
204   // The tracing page size has to be multiple of 4K. On some systems (e.g. Mac
205   // on Arm64) the system page size can be larger (e.g., 16K). That doesn't
206   // matter here, because the tracing page size is just a logical partitioning
207   // and does not have any dependencies on kernel mm syscalls (read: it's fine
208   // to have trace page sizes of 4K on a system where the kernel page size is
209   // 16K).
210   bool page_size_is_valid = page_size >= SharedMemoryABI::kMinPageSize;
211   page_size_is_valid &= page_size % SharedMemoryABI::kMinPageSize == 0;
212 
213   // Only allow power of two numbers of pages, i.e. 1, 2, 4, 8 pages.
214   size_t num_pages = page_size / SharedMemoryABI::kMinPageSize;
215   page_size_is_valid &= (num_pages & (num_pages - 1)) == 0;
216 
217   if (!page_size_is_valid || shm_size < page_size ||
218       shm_size % page_size != 0) {
219     return std::make_tuple(TracingServiceImpl::kDefaultShmSize,
220                            TracingServiceImpl::kDefaultShmPageSize);
221   }
222   return std::make_tuple(shm_size, page_size);
223 }
224 
NameMatchesFilter(const std::string & name,const std::vector<std::string> & name_filter,const std::vector<std::string> & name_regex_filter)225 bool NameMatchesFilter(const std::string& name,
226                        const std::vector<std::string>& name_filter,
227                        const std::vector<std::string>& name_regex_filter) {
228   bool filter_is_set = !name_filter.empty() || !name_regex_filter.empty();
229   if (!filter_is_set)
230     return true;
231   bool filter_matches = std::find(name_filter.begin(), name_filter.end(),
232                                   name) != name_filter.end();
233   bool filter_regex_matches =
234       std::find_if(name_regex_filter.begin(), name_regex_filter.end(),
235                    [&](const std::string& regex) {
236                      return std::regex_match(
237                          name, std::regex(regex, std::regex::extended));
238                    }) != name_regex_filter.end();
239   return filter_matches || filter_regex_matches;
240 }
241 
242 // Used when TraceConfig.write_into_file == true and output_path is not empty.
CreateTraceFile(const std::string & path,bool overwrite)243 base::ScopedFile CreateTraceFile(const std::string& path, bool overwrite) {
244 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
245     PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
246   // This is NOT trying to preserve any security property, SELinux does that.
247   // It just improves the actionability of the error when people try to save the
248   // trace in a location that is not SELinux-allowed (a generic "permission
249   // denied" vs "don't put it here, put it there").
250   // These are the only SELinux approved dir for trace files that are created
251   // directly by traced.
252   static const char* kTraceDirBasePath = "/data/misc/perfetto-traces/";
253   if (!base::StartsWith(path, kTraceDirBasePath)) {
254     PERFETTO_ELOG("Invalid output_path %s. On Android it must be within %s.",
255                   path.c_str(), kTraceDirBasePath);
256     return base::ScopedFile();
257   }
258 #endif
259   // O_CREAT | O_EXCL will fail if the file exists already.
260   const int flags = O_RDWR | O_CREAT | (overwrite ? O_TRUNC : O_EXCL);
261   auto fd = base::OpenFile(path, flags, 0600);
262   if (fd) {
263 #if defined(PERFETTO_HAS_CHMOD)
264     // Passing 0644 directly above won't work because of umask.
265     PERFETTO_CHECK(fchmod(*fd, 0644) == 0);
266 #endif
267   } else {
268     PERFETTO_PLOG("Failed to create %s", path.c_str());
269   }
270   return fd;
271 }
272 
ShouldLogEvent(const TraceConfig & cfg)273 bool ShouldLogEvent(const TraceConfig& cfg) {
274   switch (cfg.statsd_logging()) {
275     case TraceConfig::STATSD_LOGGING_ENABLED:
276       return true;
277     case TraceConfig::STATSD_LOGGING_DISABLED:
278       return false;
279     case TraceConfig::STATSD_LOGGING_UNSPECIFIED:
280       break;
281   }
282   // For backward compatibility with older versions of perfetto_cmd.
283   return cfg.enable_extra_guardrails();
284 }
285 
286 // Appends `data` (which has `size` bytes), to `*packet`. Splits the data in
287 // slices no larger than `max_slice_size`.
AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,size_t size,size_t max_slice_size,perfetto::TracePacket * packet)288 void AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,
289                                size_t size,
290                                size_t max_slice_size,
291                                perfetto::TracePacket* packet) {
292   if (size <= max_slice_size) {
293     packet->AddSlice(Slice::TakeOwnership(std::move(data), size));
294     return;
295   }
296   uint8_t* src_ptr = data.get();
297   for (size_t size_left = size; size_left > 0;) {
298     const size_t slice_size = std::min(size_left, max_slice_size);
299 
300     Slice slice = Slice::Allocate(slice_size);
301     memcpy(slice.own_data(), src_ptr, slice_size);
302     packet->AddSlice(std::move(slice));
303 
304     src_ptr += slice_size;
305     size_left -= slice_size;
306   }
307 }
308 
309 using TraceFilter = protos::gen::TraceConfig::TraceFilter;
ConvertPolicy(TraceFilter::StringFilterPolicy policy)310 std::optional<protozero::StringFilter::Policy> ConvertPolicy(
311     TraceFilter::StringFilterPolicy policy) {
312   switch (policy) {
313     case TraceFilter::SFP_UNSPECIFIED:
314       return std::nullopt;
315     case TraceFilter::SFP_MATCH_REDACT_GROUPS:
316       return protozero::StringFilter::Policy::kMatchRedactGroups;
317     case TraceFilter::SFP_ATRACE_MATCH_REDACT_GROUPS:
318       return protozero::StringFilter::Policy::kAtraceMatchRedactGroups;
319     case TraceFilter::SFP_MATCH_BREAK:
320       return protozero::StringFilter::Policy::kMatchBreak;
321     case TraceFilter::SFP_ATRACE_MATCH_BREAK:
322       return protozero::StringFilter::Policy::kAtraceMatchBreak;
323     case TraceFilter::SFP_ATRACE_REPEATED_SEARCH_REDACT_GROUPS:
324       return protozero::StringFilter::Policy::kAtraceRepeatedSearchRedactGroups;
325   }
326   return std::nullopt;
327 }
328 
329 }  // namespace
330 
331 // static
CreateInstance(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner,InitOpts init_opts)332 std::unique_ptr<TracingService> TracingService::CreateInstance(
333     std::unique_ptr<SharedMemory::Factory> shm_factory,
334     base::TaskRunner* task_runner,
335     InitOpts init_opts) {
336   tracing_service::Dependencies deps;
337   deps.clock = std::make_unique<tracing_service::ClockImpl>();
338   uint32_t seed = static_cast<uint32_t>(deps.clock->GetWallTimeMs().count());
339   deps.random = std::make_unique<tracing_service::RandomImpl>(seed);
340   return std::unique_ptr<TracingService>(new TracingServiceImpl(
341       std::move(shm_factory), task_runner, std::move(deps), init_opts));
342 }
343 
TracingServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner,tracing_service::Dependencies deps,InitOpts init_opts)344 TracingServiceImpl::TracingServiceImpl(
345     std::unique_ptr<SharedMemory::Factory> shm_factory,
346     base::TaskRunner* task_runner,
347     tracing_service::Dependencies deps,
348     InitOpts init_opts)
349     : clock_(std::move(deps.clock)),
350       random_(std::move(deps.random)),
351       init_opts_(init_opts),
352       shm_factory_(std::move(shm_factory)),
353       uid_(base::GetCurrentUserId()),
354       buffer_ids_(kMaxTraceBufferID),
355       weak_runner_(task_runner) {
356   PERFETTO_DCHECK(task_runner);
357 }
358 
~TracingServiceImpl()359 TracingServiceImpl::~TracingServiceImpl() {
360   // TODO(fmayer): handle teardown of all Producer.
361 }
362 
363 std::unique_ptr<TracingService::ProducerEndpoint>
ConnectProducer(Producer * producer,const ClientIdentity & client_identity,const std::string & producer_name,size_t shared_memory_size_hint_bytes,bool in_process,ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,const std::string & sdk_version)364 TracingServiceImpl::ConnectProducer(Producer* producer,
365                                     const ClientIdentity& client_identity,
366                                     const std::string& producer_name,
367                                     size_t shared_memory_size_hint_bytes,
368                                     bool in_process,
369                                     ProducerSMBScrapingMode smb_scraping_mode,
370                                     size_t shared_memory_page_size_hint_bytes,
371                                     std::unique_ptr<SharedMemory> shm,
372                                     const std::string& sdk_version) {
373   PERFETTO_DCHECK_THREAD(thread_checker_);
374 
375   auto uid = client_identity.uid();
376   if (lockdown_mode_ && uid != base::GetCurrentUserId()) {
377     PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
378                   static_cast<unsigned long>(uid));
379     return nullptr;
380   }
381 
382   if (producers_.size() >= kMaxProducerID) {
383     PERFETTO_DFATAL("Too many producers.");
384     return nullptr;
385   }
386   const ProducerID id = GetNextProducerID();
387   PERFETTO_DLOG("Producer %" PRIu16 " connected, uid=%d", id,
388                 static_cast<int>(uid));
389   bool smb_scraping_enabled = smb_scraping_enabled_;
390   switch (smb_scraping_mode) {
391     case ProducerSMBScrapingMode::kDefault:
392       break;
393     case ProducerSMBScrapingMode::kEnabled:
394       smb_scraping_enabled = true;
395       break;
396     case ProducerSMBScrapingMode::kDisabled:
397       smb_scraping_enabled = false;
398       break;
399   }
400 
401   std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
402       id, client_identity, this, weak_runner_.task_runner(), producer,
403       producer_name, sdk_version, in_process, smb_scraping_enabled));
404   auto it_and_inserted = producers_.emplace(id, endpoint.get());
405   PERFETTO_DCHECK(it_and_inserted.second);
406   endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
407   endpoint->shmem_page_size_hint_bytes_ = shared_memory_page_size_hint_bytes;
408 
409   // Producer::OnConnect() should run before Producer::OnTracingSetup(). The
410   // latter may be posted by SetupSharedMemory() below, so post OnConnect() now.
411   endpoint->weak_runner_.PostTask(
412       [endpoint = endpoint.get()] { endpoint->producer_->OnConnect(); });
413 
414   if (shm) {
415     // The producer supplied an SMB. This is used only by Chrome; in the most
416     // common cases the SMB is created by the service and passed via
417     // OnTracingSetup(). Verify that it is correctly sized before we attempt to
418     // use it. The transport layer has to verify the integrity of the SMB (e.g.
419     // ensure that the producer can't resize if after the fact).
420     size_t shm_size, page_size;
421     std::tie(shm_size, page_size) =
422         EnsureValidShmSizes(shm->size(), endpoint->shmem_page_size_hint_bytes_);
423     if (shm_size == shm->size() &&
424         page_size == endpoint->shmem_page_size_hint_bytes_) {
425       PERFETTO_DLOG(
426           "Adopting producer-provided SMB of %zu kB for producer \"%s\"",
427           shm_size / 1024, endpoint->name_.c_str());
428       endpoint->SetupSharedMemory(std::move(shm), page_size,
429                                   /*provided_by_producer=*/true);
430     } else {
431       PERFETTO_LOG(
432           "Discarding incorrectly sized producer-provided SMB for producer "
433           "\"%s\", falling back to service-provided SMB. Requested sizes: %zu "
434           "B total, %zu B page size; suggested corrected sizes: %zu B total, "
435           "%zu B page size",
436           endpoint->name_.c_str(), shm->size(),
437           endpoint->shmem_page_size_hint_bytes_, shm_size, page_size);
438       shm.reset();
439     }
440   }
441 
442   return std::unique_ptr<ProducerEndpoint>(std::move(endpoint));
443 }
444 
DisconnectProducer(ProducerID id)445 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
446   PERFETTO_DCHECK_THREAD(thread_checker_);
447   PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
448   PERFETTO_DCHECK(producers_.count(id));
449 
450   // Scrape remaining chunks for this producer to ensure we don't lose data.
451   if (auto* producer = GetProducer(id)) {
452     for (auto& session_id_and_session : tracing_sessions_)
453       ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
454   }
455 
456   for (auto it = data_sources_.begin(); it != data_sources_.end();) {
457     auto next = it;
458     next++;
459     if (it->second.producer_id == id)
460       UnregisterDataSource(id, it->second.descriptor.name());
461     it = next;
462   }
463 
464   producers_.erase(id);
465   UpdateMemoryGuardrail();
466 }
467 
GetProducer(ProducerID id) const468 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
469     ProducerID id) const {
470   PERFETTO_DCHECK_THREAD(thread_checker_);
471   auto it = producers_.find(id);
472   if (it == producers_.end())
473     return nullptr;
474   return it->second;
475 }
476 
477 std::unique_ptr<TracingService::ConsumerEndpoint>
ConnectConsumer(Consumer * consumer,uid_t uid)478 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
479   PERFETTO_DCHECK_THREAD(thread_checker_);
480   PERFETTO_DLOG("Consumer %p connected from UID %" PRIu64,
481                 reinterpret_cast<void*>(consumer), static_cast<uint64_t>(uid));
482   std::unique_ptr<ConsumerEndpointImpl> endpoint(new ConsumerEndpointImpl(
483       this, weak_runner_.task_runner(), consumer, uid));
484   // Consumer might go away before we're able to send the connect notification,
485   // if that is the case just bail out.
486   auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
487   weak_runner_.task_runner()->PostTask([weak_ptr = std::move(weak_ptr)] {
488     if (weak_ptr)
489       weak_ptr->consumer_->OnConnect();
490   });
491   return std::unique_ptr<ConsumerEndpoint>(std::move(endpoint));
492 }
493 
DisconnectConsumer(ConsumerEndpointImpl * consumer)494 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
495   PERFETTO_DCHECK_THREAD(thread_checker_);
496   PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
497 
498   if (consumer->tracing_session_id_)
499     FreeBuffers(consumer->tracing_session_id_);  // Will also DisableTracing().
500 
501   // At this point no more pointers to |consumer| should be around.
502   PERFETTO_DCHECK(!std::any_of(
503       tracing_sessions_.begin(), tracing_sessions_.end(),
504       [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
505         return kv.second.consumer_maybe_null == consumer;
506       }));
507 }
508 
DetachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)509 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
510                                         const std::string& key) {
511   PERFETTO_DCHECK_THREAD(thread_checker_);
512   PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
513 
514   TracingSessionID tsid = consumer->tracing_session_id_;
515   TracingSession* tracing_session;
516   if (!tsid || !(tracing_session = GetTracingSession(tsid)))
517     return false;
518 
519   if (GetDetachedSession(consumer->uid_, key)) {
520     PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
521                   key.c_str());
522     return false;
523   }
524 
525   PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
526   tracing_session->consumer_maybe_null = nullptr;
527   tracing_session->detach_key = key;
528   consumer->tracing_session_id_ = 0;
529   return true;
530 }
531 
532 std::unique_ptr<TracingService::RelayEndpoint>
ConnectRelayClient(RelayClientID relay_client_id)533 TracingServiceImpl::ConnectRelayClient(RelayClientID relay_client_id) {
534   PERFETTO_DCHECK_THREAD(thread_checker_);
535 
536   auto endpoint = std::make_unique<RelayEndpointImpl>(relay_client_id, this);
537   relay_clients_[relay_client_id] = endpoint.get();
538 
539   return std::move(endpoint);
540 }
541 
DisconnectRelayClient(RelayClientID relay_client_id)542 void TracingServiceImpl::DisconnectRelayClient(RelayClientID relay_client_id) {
543   PERFETTO_DCHECK_THREAD(thread_checker_);
544 
545   if (relay_clients_.find(relay_client_id) == relay_clients_.end())
546     return;
547   relay_clients_.erase(relay_client_id);
548 }
549 
AttachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)550 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
551                                         const std::string& key) {
552   PERFETTO_DCHECK_THREAD(thread_checker_);
553   PERFETTO_DLOG("Consumer %p attaching to session %s",
554                 reinterpret_cast<void*>(consumer), key.c_str());
555 
556   if (consumer->tracing_session_id_) {
557     PERFETTO_ELOG(
558         "Cannot reattach consumer to session %s"
559         " while it already attached tracing session ID %" PRIu64,
560         key.c_str(), consumer->tracing_session_id_);
561     return false;
562   }
563 
564   auto* tracing_session = GetDetachedSession(consumer->uid_, key);
565   if (!tracing_session) {
566     PERFETTO_ELOG(
567         "Failed to attach consumer, session '%s' not found for uid %d",
568         key.c_str(), static_cast<int>(consumer->uid_));
569     return false;
570   }
571 
572   consumer->tracing_session_id_ = tracing_session->id;
573   tracing_session->consumer_maybe_null = consumer;
574   tracing_session->detach_key.clear();
575   return true;
576 }
577 
EnableTracing(ConsumerEndpointImpl * consumer,const TraceConfig & cfg,base::ScopedFile fd)578 base::Status TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
579                                                const TraceConfig& cfg,
580                                                base::ScopedFile fd) {
581   PERFETTO_DCHECK_THREAD(thread_checker_);
582 
583   // If the producer is specifying a UUID, respect that (at least for the first
584   // snapshot). Otherwise generate a new UUID.
585   base::Uuid uuid(cfg.trace_uuid_lsb(), cfg.trace_uuid_msb());
586   if (!uuid)
587     uuid = base::Uuidv4();
588 
589   PERFETTO_DLOG("Enabling tracing for consumer %p, UUID: %s",
590                 reinterpret_cast<void*>(consumer),
591                 uuid.ToPrettyString().c_str());
592   MaybeLogUploadEvent(cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracing);
593   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_SET)
594     lockdown_mode_ = true;
595   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_CLEAR)
596     lockdown_mode_ = false;
597 
598   // Scope |tracing_session| to this block to prevent accidental use of a null
599   // pointer later in this function.
600   {
601     TracingSession* tracing_session =
602         GetTracingSession(consumer->tracing_session_id_);
603     if (tracing_session) {
604       MaybeLogUploadEvent(
605           cfg, uuid,
606           PerfettoStatsdAtom::kTracedEnableTracingExistingTraceSession);
607       return PERFETTO_SVC_ERR(
608           "A Consumer is trying to EnableTracing() but another tracing "
609           "session is already active (forgot a call to FreeBuffers() ?)");
610     }
611   }
612 
613   const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
614                                        ? kGuardrailsMaxTracingDurationMillis
615                                        : kMaxTracingDurationMillis;
616   if (cfg.duration_ms() > max_duration_ms) {
617     MaybeLogUploadEvent(cfg, uuid,
618                         PerfettoStatsdAtom::kTracedEnableTracingTooLongTrace);
619     return PERFETTO_SVC_ERR("Requested too long trace (%" PRIu32
620                             "ms  > %" PRIu32 " ms)",
621                             cfg.duration_ms(), max_duration_ms);
622   }
623 
624   const bool has_trigger_config =
625       GetTriggerMode(cfg) != TraceConfig::TriggerConfig::UNSPECIFIED;
626   if (has_trigger_config &&
627       (cfg.trigger_config().trigger_timeout_ms() == 0 ||
628        cfg.trigger_config().trigger_timeout_ms() > max_duration_ms)) {
629     MaybeLogUploadEvent(
630         cfg, uuid,
631         PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerTimeout);
632     return PERFETTO_SVC_ERR(
633         "Traces with START_TRACING triggers must provide a positive "
634         "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
635         cfg.trigger_config().trigger_timeout_ms());
636   }
637 
638   // This check has been introduced in May 2023 after finding b/274931668.
639   if (static_cast<int>(cfg.trigger_config().trigger_mode()) >
640       TraceConfig::TriggerConfig::TriggerMode_MAX) {
641     MaybeLogUploadEvent(
642         cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerMode);
643     return PERFETTO_SVC_ERR(
644         "The trace config specified an invalid trigger_mode");
645   }
646 
647   if (cfg.trigger_config().use_clone_snapshot_if_available() &&
648       cfg.trigger_config().trigger_mode() !=
649           TraceConfig::TriggerConfig::STOP_TRACING) {
650     MaybeLogUploadEvent(
651         cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerMode);
652     return PERFETTO_SVC_ERR(
653         "trigger_mode must be STOP_TRACING when "
654         "use_clone_snapshot_if_available=true");
655   }
656 
657   if (has_trigger_config && cfg.duration_ms() != 0) {
658     MaybeLogUploadEvent(
659         cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingDurationWithTrigger);
660     return PERFETTO_SVC_ERR(
661         "duration_ms was set, this must not be set for traces with triggers.");
662   }
663 
664   for (char c : cfg.bugreport_filename()) {
665     if (!((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
666           (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.')) {
667       MaybeLogUploadEvent(
668           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidBrFilename);
669       return PERFETTO_SVC_ERR(
670           "bugreport_filename contains invalid chars. Use [a-zA-Z0-9-_.]+");
671     }
672   }
673 
674   if ((GetTriggerMode(cfg) == TraceConfig::TriggerConfig::STOP_TRACING ||
675        GetTriggerMode(cfg) == TraceConfig::TriggerConfig::CLONE_SNAPSHOT) &&
676       cfg.write_into_file()) {
677     // We don't support this usecase because there are subtle assumptions which
678     // break around TracingServiceEvents and windowed sorting (i.e. if we don't
679     // drain the events in ReadBuffersIntoFile because we are waiting for
680     // STOP_TRACING, we can end up queueing up a lot of TracingServiceEvents and
681     // emitting them wildy out of order breaking windowed sorting in trace
682     // processor).
683     MaybeLogUploadEvent(
684         cfg, uuid,
685         PerfettoStatsdAtom::kTracedEnableTracingStopTracingWriteIntoFile);
686     return PERFETTO_SVC_ERR(
687         "Specifying trigger mode STOP_TRACING/CLONE_SNAPSHOT and "
688         "write_into_file together is unsupported");
689   }
690 
691   std::unordered_set<std::string> triggers;
692   for (const auto& trigger : cfg.trigger_config().triggers()) {
693     if (!triggers.insert(trigger.name()).second) {
694       MaybeLogUploadEvent(
695           cfg, uuid,
696           PerfettoStatsdAtom::kTracedEnableTracingDuplicateTriggerName);
697       return PERFETTO_SVC_ERR("Duplicate trigger name: %s",
698                               trigger.name().c_str());
699     }
700   }
701 
702   if (cfg.enable_extra_guardrails()) {
703     if (cfg.deferred_start()) {
704       MaybeLogUploadEvent(
705           cfg, uuid,
706           PerfettoStatsdAtom::kTracedEnableTracingInvalidDeferredStart);
707       return PERFETTO_SVC_ERR(
708           "deferred_start=true is not supported in unsupervised traces");
709     }
710     uint64_t buf_size_sum = 0;
711     for (const auto& buf : cfg.buffers()) {
712       if (buf.size_kb() % 4 != 0) {
713         MaybeLogUploadEvent(
714             cfg, uuid,
715             PerfettoStatsdAtom::kTracedEnableTracingInvalidBufferSize);
716         return PERFETTO_SVC_ERR(
717             "buffers.size_kb must be a multiple of 4, got %" PRIu32,
718             buf.size_kb());
719       }
720       buf_size_sum += buf.size_kb();
721     }
722 
723     uint32_t max_tracing_buffer_size_kb =
724         std::max(kGuardrailsMaxTracingBufferSizeKb,
725                  cfg.guardrail_overrides().max_tracing_buffer_size_kb());
726     if (buf_size_sum > max_tracing_buffer_size_kb) {
727       MaybeLogUploadEvent(
728           cfg, uuid,
729           PerfettoStatsdAtom::kTracedEnableTracingBufferSizeTooLarge);
730       return PERFETTO_SVC_ERR("Requested too large trace buffer (%" PRIu64
731                               "kB  > %" PRIu32 " kB)",
732                               buf_size_sum, max_tracing_buffer_size_kb);
733     }
734   }
735 
736   if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
737     MaybeLogUploadEvent(cfg, uuid,
738                         PerfettoStatsdAtom::kTracedEnableTracingTooManyBuffers);
739     return PERFETTO_SVC_ERR("Too many buffers configured (%d)",
740                             cfg.buffers_size());
741   }
742   // Check that the config specifies all buffers for its data sources. This
743   // is also checked in SetupDataSource, but it is simpler to return a proper
744   // error to the consumer from here (and there will be less state to undo).
745   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
746     size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
747     size_t target_buffer = cfg_data_source.config().target_buffer();
748     if (target_buffer >= num_buffers) {
749       MaybeLogUploadEvent(
750           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingOobTargetBuffer);
751       return PERFETTO_SVC_ERR(
752           "Data source \"%s\" specified an out of bounds target_buffer (%zu >= "
753           "%zu)",
754           cfg_data_source.config().name().c_str(), target_buffer, num_buffers);
755     }
756   }
757 
758   if (!cfg.unique_session_name().empty()) {
759     const std::string& name = cfg.unique_session_name();
760     for (auto& kv : tracing_sessions_) {
761       if (kv.second.state == TracingSession::CLONED_READ_ONLY)
762         continue;  // Don't consider cloned sessions in uniqueness checks.
763       if (kv.second.config.unique_session_name() == name) {
764         MaybeLogUploadEvent(
765             cfg, uuid,
766             PerfettoStatsdAtom::kTracedEnableTracingDuplicateSessionName);
767         static const char fmt[] =
768             "A trace with this unique session name (%s) already exists";
769         // This happens frequently, don't make it an "E"LOG.
770         PERFETTO_LOG(fmt, name.c_str());
771         return base::ErrStatus(fmt, name.c_str());
772       }
773     }
774   }
775 
776   if (!cfg.session_semaphores().empty()) {
777     struct SemaphoreSessionsState {
778       uint64_t smallest_max_other_session_count =
779           std::numeric_limits<uint64_t>::max();
780       uint64_t session_count = 0;
781     };
782     // For each semaphore, compute the number of active sessions and the
783     // MIN(limit).
784     std::unordered_map<std::string, SemaphoreSessionsState>
785         sem_to_sessions_state;
786     for (const auto& id_and_session : tracing_sessions_) {
787       const auto& session = id_and_session.second;
788       if (session.state == TracingSession::CLONED_READ_ONLY ||
789           session.state == TracingSession::DISABLED) {
790         // Don't consider cloned or disabled sessions in checks.
791         continue;
792       }
793       for (const auto& sem : session.config.session_semaphores()) {
794         auto& sessions_state = sem_to_sessions_state[sem.name()];
795         sessions_state.smallest_max_other_session_count =
796             std::min(sessions_state.smallest_max_other_session_count,
797                      sem.max_other_session_count());
798         sessions_state.session_count++;
799       }
800     }
801 
802     // Check if any of the semaphores declared by the config clashes with any of
803     // the currently active semaphores.
804     for (const auto& semaphore : cfg.session_semaphores()) {
805       auto it = sem_to_sessions_state.find(semaphore.name());
806       if (it == sem_to_sessions_state.end()) {
807         continue;
808       }
809       uint64_t max_other_session_count =
810           std::min(semaphore.max_other_session_count(),
811                    it->second.smallest_max_other_session_count);
812       if (it->second.session_count > max_other_session_count) {
813         MaybeLogUploadEvent(
814             cfg, uuid,
815             PerfettoStatsdAtom::
816                 kTracedEnableTracingFailedSessionSemaphoreCheck);
817         return PERFETTO_SVC_ERR(
818             "Semaphore \"%s\" exceeds maximum allowed other session count "
819             "(%" PRIu64 " > min(%" PRIu64 ", %" PRIu64 "))",
820             semaphore.name().c_str(), it->second.session_count,
821             semaphore.max_other_session_count(),
822             it->second.smallest_max_other_session_count);
823       }
824     }
825   }
826 
827   if (cfg.enable_extra_guardrails()) {
828     // unique_session_name can be empty
829     const std::string& name = cfg.unique_session_name();
830     int64_t now_s = clock_->GetBootTimeS().count();
831 
832     // Remove any entries where the time limit has passed so this map doesn't
833     // grow indefinitely:
834     std::map<std::string, int64_t>& sessions = session_to_last_trace_s_;
835     for (auto it = sessions.cbegin(); it != sessions.cend();) {
836       if (now_s - it->second > kMinSecondsBetweenTracesGuardrail) {
837         it = sessions.erase(it);
838       } else {
839         ++it;
840       }
841     }
842 
843     int64_t& previous_s = session_to_last_trace_s_[name];
844     if (previous_s == 0) {
845       previous_s = now_s;
846     } else {
847       MaybeLogUploadEvent(
848           cfg, uuid,
849           PerfettoStatsdAtom::kTracedEnableTracingSessionNameTooRecent);
850       return PERFETTO_SVC_ERR(
851           "A trace with unique session name \"%s\" began less than %" PRId64
852           "s ago (%" PRId64 "s)",
853           name.c_str(), kMinSecondsBetweenTracesGuardrail, now_s - previous_s);
854     }
855   }
856 
857   const int sessions_for_uid = static_cast<int>(std::count_if(
858       tracing_sessions_.begin(), tracing_sessions_.end(),
859       [consumer](const decltype(tracing_sessions_)::value_type& s) {
860         return s.second.consumer_uid == consumer->uid_;
861       }));
862 
863   int per_uid_limit = kMaxConcurrentTracingSessionsPerUid;
864   if (consumer->uid_ == 1066 /* AID_STATSD*/) {
865     per_uid_limit = kMaxConcurrentTracingSessionsForStatsdUid;
866   }
867   if (sessions_for_uid >= per_uid_limit) {
868     MaybeLogUploadEvent(
869         cfg, uuid,
870         PerfettoStatsdAtom::kTracedEnableTracingTooManySessionsForUid);
871     return PERFETTO_SVC_ERR(
872         "Too many concurrent tracing sesions (%d) for uid %d limit is %d",
873         sessions_for_uid, static_cast<int>(consumer->uid_), per_uid_limit);
874   }
875 
876   // TODO(primiano): This is a workaround to prevent that a producer gets stuck
877   // in a state where it stalls by design by having more TraceWriterImpl
878   // instances than free pages in the buffer. This is really a bug in
879   // trace_probes and the way it handles stalls in the shmem buffer.
880   if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
881     MaybeLogUploadEvent(
882         cfg, uuid,
883         PerfettoStatsdAtom::kTracedEnableTracingTooManyConcurrentSessions);
884     return PERFETTO_SVC_ERR("Too many concurrent tracing sesions (%zu)",
885                             tracing_sessions_.size());
886   }
887 
888   // If the trace config provides a filter bytecode, setup the filter now.
889   // If the filter loading fails, abort the tracing session rather than running
890   // unfiltered.
891   std::unique_ptr<protozero::MessageFilter> trace_filter;
892   if (cfg.has_trace_filter()) {
893     const auto& filt = cfg.trace_filter();
894     trace_filter.reset(new protozero::MessageFilter());
895 
896     protozero::StringFilter& string_filter = trace_filter->string_filter();
897     for (const auto& rule : filt.string_filter_chain().rules()) {
898       auto opt_policy = ConvertPolicy(rule.policy());
899       if (!opt_policy.has_value()) {
900         MaybeLogUploadEvent(
901             cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
902         return PERFETTO_SVC_ERR(
903             "Trace filter has invalid string filtering rules, aborting");
904       }
905       string_filter.AddRule(*opt_policy, rule.regex_pattern(),
906                             rule.atrace_payload_starts_with());
907     }
908 
909     const std::string& bytecode_v1 = filt.bytecode();
910     const std::string& bytecode_v2 = filt.bytecode_v2();
911     const std::string& bytecode =
912         bytecode_v2.empty() ? bytecode_v1 : bytecode_v2;
913     if (!trace_filter->LoadFilterBytecode(bytecode.data(), bytecode.size())) {
914       MaybeLogUploadEvent(
915           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
916       return PERFETTO_SVC_ERR("Trace filter bytecode invalid, aborting");
917     }
918 
919     // The filter is created using perfetto.protos.Trace as root message
920     // (because that makes it possible to play around with the `proto_filter`
921     // tool on actual traces). Here in the service, however, we deal with
922     // perfetto.protos.TracePacket(s), which are one level down (Trace.packet).
923     // The IPC client (or the write_into_filte logic in here) are responsible
924     // for pre-pending the packet preamble (See GetProtoPreamble() calls), but
925     // the preamble is not there at ReadBuffer time. Hence we change the root of
926     // the filtering to start at the Trace.packet level.
927     if (!trace_filter->SetFilterRoot({TracePacket::kPacketFieldNumber})) {
928       MaybeLogUploadEvent(
929           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
930       return PERFETTO_SVC_ERR("Failed to set filter root.");
931     }
932   }
933 
934   const TracingSessionID tsid = ++last_tracing_session_id_;
935   TracingSession* tracing_session =
936       &tracing_sessions_
937            .emplace(std::piecewise_construct, std::forward_as_tuple(tsid),
938                     std::forward_as_tuple(tsid, consumer, cfg,
939                                           weak_runner_.task_runner()))
940            .first->second;
941 
942   tracing_session->trace_uuid = uuid;
943 
944   if (trace_filter)
945     tracing_session->trace_filter = std::move(trace_filter);
946 
947   if (cfg.write_into_file()) {
948     if (!fd ^ !cfg.output_path().empty()) {
949       MaybeLogUploadEvent(
950           tracing_session->config, uuid,
951           PerfettoStatsdAtom::kTracedEnableTracingInvalidFdOutputFile);
952       tracing_sessions_.erase(tsid);
953       return PERFETTO_SVC_ERR(
954           "When write_into_file==true either a FD needs to be passed or "
955           "output_path must be populated (but not both)");
956     }
957     if (!cfg.output_path().empty()) {
958       fd = CreateTraceFile(cfg.output_path(), /*overwrite=*/false);
959       if (!fd) {
960         MaybeLogUploadEvent(
961             tracing_session->config, uuid,
962             PerfettoStatsdAtom::kTracedEnableTracingFailedToCreateFile);
963         tracing_sessions_.erase(tsid);
964         return PERFETTO_SVC_ERR("Failed to create the trace file %s",
965                                 cfg.output_path().c_str());
966       }
967     }
968     tracing_session->write_into_file = std::move(fd);
969     uint32_t write_period_ms = cfg.file_write_period_ms();
970     if (write_period_ms == 0)
971       write_period_ms = kDefaultWriteIntoFilePeriodMs;
972     if (write_period_ms < kMinWriteIntoFilePeriodMs)
973       write_period_ms = kMinWriteIntoFilePeriodMs;
974     tracing_session->write_period_ms = write_period_ms;
975     tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
976     tracing_session->bytes_written_into_file = 0;
977   }
978 
979   if (cfg.compression_type() == TraceConfig::COMPRESSION_TYPE_DEFLATE) {
980     if (init_opts_.compressor_fn) {
981       tracing_session->compress_deflate = true;
982     } else {
983       PERFETTO_LOG(
984           "COMPRESSION_TYPE_DEFLATE is not supported in the current build "
985           "configuration. Skipping compression");
986     }
987   }
988 
989   // Initialize the log buffers.
990   bool did_allocate_all_buffers = true;
991   bool invalid_buffer_config = false;
992 
993   // Allocate the trace buffers. Also create a map to translate a consumer
994   // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
995   // corresponding BufferID, which is a global ID namespace for the service and
996   // all producers.
997   size_t total_buf_size_kb = 0;
998   const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
999   tracing_session->buffers_index.reserve(num_buffers);
1000   for (size_t i = 0; i < num_buffers; i++) {
1001     const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
1002     BufferID global_id = buffer_ids_.Allocate();
1003     if (!global_id) {
1004       did_allocate_all_buffers = false;  // We ran out of IDs.
1005       break;
1006     }
1007     tracing_session->buffers_index.push_back(global_id);
1008     // TraceBuffer size is limited to 32-bit.
1009     const uint32_t buf_size_kb = buffer_cfg.size_kb();
1010     const uint64_t buf_size_bytes = buf_size_kb * static_cast<uint64_t>(1024);
1011     const size_t buf_size = static_cast<size_t>(buf_size_bytes);
1012     if (buf_size_bytes == 0 ||
1013         buf_size_bytes > std::numeric_limits<uint32_t>::max() ||
1014         buf_size != buf_size_bytes) {
1015       invalid_buffer_config = true;
1016       did_allocate_all_buffers = false;
1017       break;
1018     }
1019     total_buf_size_kb += buf_size_kb;
1020     TraceBuffer::OverwritePolicy policy =
1021         buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
1022             ? TraceBuffer::kDiscard
1023             : TraceBuffer::kOverwrite;
1024     auto it_and_inserted =
1025         buffers_.emplace(global_id, TraceBuffer::Create(buf_size, policy));
1026     PERFETTO_DCHECK(it_and_inserted.second);  // buffers_.count(global_id) == 0.
1027     std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
1028     if (!trace_buffer) {
1029       did_allocate_all_buffers = false;
1030       break;
1031     }
1032   }
1033 
1034   // This can happen if either:
1035   // - All the kMaxTraceBufferID slots are taken.
1036   // - OOM, or, more realistically, we exhausted virtual memory.
1037   // - The buffer size in the config is invalid.
1038   // In any case, free all the previously allocated buffers and abort.
1039   if (!did_allocate_all_buffers) {
1040     for (BufferID global_id : tracing_session->buffers_index) {
1041       buffer_ids_.Free(global_id);
1042       buffers_.erase(global_id);
1043     }
1044     MaybeLogUploadEvent(tracing_session->config, uuid,
1045                         PerfettoStatsdAtom::kTracedEnableTracingOom);
1046     tracing_sessions_.erase(tsid);
1047     if (invalid_buffer_config) {
1048       return PERFETTO_SVC_ERR(
1049           "Failed to allocate tracing buffers: Invalid buffer sizes");
1050     }
1051     return PERFETTO_SVC_ERR(
1052         "Failed to allocate tracing buffers: OOM or too many buffers");
1053   }
1054 
1055   UpdateMemoryGuardrail();
1056 
1057   consumer->tracing_session_id_ = tsid;
1058 
1059   // Setup the data sources on the producers without starting them.
1060   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
1061     // Scan all the registered data sources with a matching name.
1062     auto range = data_sources_.equal_range(cfg_data_source.config().name());
1063     for (auto it = range.first; it != range.second; it++) {
1064       TraceConfig::ProducerConfig producer_config;
1065       for (const auto& config : cfg.producers()) {
1066         if (GetProducer(it->second.producer_id)->name_ ==
1067             config.producer_name()) {
1068           producer_config = config;
1069           break;
1070         }
1071       }
1072       SetupDataSource(cfg_data_source, producer_config, it->second,
1073                       tracing_session);
1074     }
1075   }
1076 
1077   bool has_start_trigger = false;
1078   switch (GetTriggerMode(cfg)) {
1079     case TraceConfig::TriggerConfig::UNSPECIFIED:
1080       // no triggers are specified so this isn't a trace that is using triggers.
1081       PERFETTO_DCHECK(!has_trigger_config);
1082       break;
1083     case TraceConfig::TriggerConfig::START_TRACING:
1084       // For traces which use START_TRACE triggers we need to ensure that the
1085       // tracing session will be cleaned up when it times out.
1086       has_start_trigger = true;
1087       weak_runner_.PostDelayedTask(
1088           [tsid, this]() { OnStartTriggersTimeout(tsid); },
1089           cfg.trigger_config().trigger_timeout_ms());
1090       break;
1091     case TraceConfig::TriggerConfig::STOP_TRACING:
1092     case TraceConfig::TriggerConfig::CLONE_SNAPSHOT:
1093       // Update the tracing_session's duration_ms to ensure that if no trigger
1094       // is received the session will end and be cleaned up equal to the
1095       // timeout.
1096       //
1097       // TODO(nuskos): Refactor this so that rather then modifying the config we
1098       // have a field we look at on the tracing_session.
1099       tracing_session->config.set_duration_ms(
1100           cfg.trigger_config().trigger_timeout_ms());
1101       break;
1102 
1103       // The case of unknown modes (coming from future versions of the service)
1104       // is handled few lines above (search for TriggerMode_MAX).
1105   }
1106 
1107   tracing_session->state = TracingSession::CONFIGURED;
1108   PERFETTO_LOG(
1109       "Configured tracing session %" PRIu64
1110       ", #sources:%zu, duration:%d ms%s, #buffers:%d, total "
1111       "buffer size:%zu KB, total sessions:%zu, uid:%d session name: \"%s\"",
1112       tsid, cfg.data_sources().size(), tracing_session->config.duration_ms(),
1113       tracing_session->config.prefer_suspend_clock_for_duration()
1114           ? " (suspend_clock)"
1115           : "",
1116       cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size(),
1117       static_cast<unsigned int>(consumer->uid_),
1118       cfg.unique_session_name().c_str());
1119 
1120   // Start the data sources, unless this is a case of early setup + fast
1121   // triggering, either through TraceConfig.deferred_start or
1122   // TraceConfig.trigger_config(). If both are specified which ever one occurs
1123   // first will initiate the trace.
1124   if (!cfg.deferred_start() && !has_start_trigger)
1125     StartTracing(tsid);
1126 
1127   return base::OkStatus();
1128 }
1129 
ChangeTraceConfig(ConsumerEndpointImpl * consumer,const TraceConfig & updated_cfg)1130 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
1131                                            const TraceConfig& updated_cfg) {
1132   PERFETTO_DCHECK_THREAD(thread_checker_);
1133   TracingSession* tracing_session =
1134       GetTracingSession(consumer->tracing_session_id_);
1135   PERFETTO_DCHECK(tracing_session);
1136 
1137   if ((tracing_session->state != TracingSession::STARTED) &&
1138       (tracing_session->state != TracingSession::CONFIGURED)) {
1139     PERFETTO_ELOG(
1140         "ChangeTraceConfig() was called for a tracing session which isn't "
1141         "running.");
1142     return;
1143   }
1144 
1145   // We only support updating producer_name_{,regex}_filter (and pass-through
1146   // configs) for now; null out any changeable fields and make sure the rest are
1147   // identical.
1148   TraceConfig new_config_copy(updated_cfg);
1149   for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
1150     ds_cfg.clear_producer_name_filter();
1151     ds_cfg.clear_producer_name_regex_filter();
1152   }
1153 
1154   TraceConfig current_config_copy(tracing_session->config);
1155   for (auto& ds_cfg : *current_config_copy.mutable_data_sources()) {
1156     ds_cfg.clear_producer_name_filter();
1157     ds_cfg.clear_producer_name_regex_filter();
1158   }
1159 
1160   if (new_config_copy != current_config_copy) {
1161     PERFETTO_LOG(
1162         "ChangeTraceConfig() was called with a config containing unsupported "
1163         "changes; only adding to the producer_name_{,regex}_filter is "
1164         "currently supported and will have an effect.");
1165   }
1166 
1167   for (TraceConfig::DataSource& cfg_data_source :
1168        *tracing_session->config.mutable_data_sources()) {
1169     // Find the updated producer_filter in the new config.
1170     std::vector<std::string> new_producer_name_filter;
1171     std::vector<std::string> new_producer_name_regex_filter;
1172     bool found_data_source = false;
1173     for (const auto& it : updated_cfg.data_sources()) {
1174       if (cfg_data_source.config().name() == it.config().name()) {
1175         new_producer_name_filter = it.producer_name_filter();
1176         new_producer_name_regex_filter = it.producer_name_regex_filter();
1177         found_data_source = true;
1178         break;
1179       }
1180     }
1181 
1182     // Bail out if data source not present in the new config.
1183     if (!found_data_source) {
1184       PERFETTO_ELOG(
1185           "ChangeTraceConfig() called without a current data source also "
1186           "present in the new config: %s",
1187           cfg_data_source.config().name().c_str());
1188       continue;
1189     }
1190 
1191     // TODO(oysteine): Just replacing the filter means that if
1192     // there are any filter entries which were present in the original config,
1193     // but removed from the config passed to ChangeTraceConfig, any matching
1194     // producers will keep producing but newly added producers after this
1195     // point will never start.
1196     *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
1197     *cfg_data_source.mutable_producer_name_regex_filter() =
1198         new_producer_name_regex_filter;
1199 
1200     // Get the list of producers that are already set up.
1201     std::unordered_set<uint16_t> set_up_producers;
1202     auto& ds_instances = tracing_session->data_source_instances;
1203     for (auto instance_it = ds_instances.begin();
1204          instance_it != ds_instances.end(); ++instance_it) {
1205       set_up_producers.insert(instance_it->first);
1206     }
1207 
1208     // Scan all the registered data sources with a matching name.
1209     auto range = data_sources_.equal_range(cfg_data_source.config().name());
1210     for (auto it = range.first; it != range.second; it++) {
1211       ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
1212       PERFETTO_DCHECK(producer);
1213 
1214       // Check if the producer name of this data source is present
1215       // in the name filters. We currently only support new filters, not
1216       // removing old ones.
1217       if (!NameMatchesFilter(producer->name_, new_producer_name_filter,
1218                              new_producer_name_regex_filter)) {
1219         continue;
1220       }
1221 
1222       // If this producer is already set up, we assume that all datasources
1223       // in it started already.
1224       if (set_up_producers.count(it->second.producer_id))
1225         continue;
1226 
1227       // If it wasn't previously setup, set it up now.
1228       // (The per-producer config is optional).
1229       TraceConfig::ProducerConfig producer_config;
1230       for (const auto& config : tracing_session->config.producers()) {
1231         if (producer->name_ == config.producer_name()) {
1232           producer_config = config;
1233           break;
1234         }
1235       }
1236 
1237       DataSourceInstance* ds_inst = SetupDataSource(
1238           cfg_data_source, producer_config, it->second, tracing_session);
1239 
1240       if (ds_inst && tracing_session->state == TracingSession::STARTED)
1241         StartDataSourceInstance(producer, tracing_session, ds_inst);
1242     }
1243   }
1244 }
1245 
DelayToNextWritePeriodMs(const TracingSession & session)1246 uint32_t TracingServiceImpl::DelayToNextWritePeriodMs(
1247     const TracingSession& session) {
1248   PERFETTO_DCHECK(session.write_period_ms > 0);
1249   return session.write_period_ms -
1250          static_cast<uint32_t>(clock_->GetWallTimeMs().count() %
1251                                session.write_period_ms);
1252 }
1253 
StartTracing(TracingSessionID tsid)1254 void TracingServiceImpl::StartTracing(TracingSessionID tsid) {
1255   PERFETTO_DCHECK_THREAD(thread_checker_);
1256 
1257   TracingSession* tracing_session = GetTracingSession(tsid);
1258   if (!tracing_session) {
1259     PERFETTO_ELOG("StartTracing() failed, invalid session ID %" PRIu64, tsid);
1260     return;
1261   }
1262 
1263   MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1264                       PerfettoStatsdAtom::kTracedStartTracing);
1265 
1266   if (tracing_session->state != TracingSession::CONFIGURED) {
1267     MaybeLogUploadEvent(
1268         tracing_session->config, tracing_session->trace_uuid,
1269         PerfettoStatsdAtom::kTracedStartTracingInvalidSessionState);
1270     PERFETTO_ELOG("StartTracing() failed, invalid session state: %d",
1271                   tracing_session->state);
1272     return;
1273   }
1274 
1275   tracing_session->state = TracingSession::STARTED;
1276 
1277   // We store the start of trace snapshot separately as it's important to make
1278   // sure we can interpret all the data in the trace and storing it in the ring
1279   // buffer means it could be overwritten by a later snapshot.
1280   if (!tracing_session->config.builtin_data_sources()
1281            .disable_clock_snapshotting()) {
1282     SnapshotClocks(&tracing_session->initial_clock_snapshot);
1283   }
1284 
1285   // We don't snapshot the clocks here because we just did this above.
1286   SnapshotLifecyleEvent(
1287       tracing_session,
1288       protos::pbzero::TracingServiceEvent::kTracingStartedFieldNumber,
1289       false /* snapshot_clocks */);
1290 
1291   // Periodically snapshot clocks, stats, sync markers while the trace is
1292   // active. The snapshots are emitted on the future ReadBuffers() calls, which
1293   // means that:
1294   //  (a) If we're streaming to a file (or to a consumer) while tracing, we
1295   //      write snapshots periodically into the trace.
1296   //  (b) If ReadBuffers() is only called after tracing ends, we emit the latest
1297   //      snapshot into the trace. For clock snapshots, we keep track of the
1298   //      snapshot recorded at the beginning of the session
1299   //      (initial_clock_snapshot above), as well as the most recent sampled
1300   //      snapshots that showed significant new drift between different clocks.
1301   //      The latter clock snapshots are sampled periodically and at lifecycle
1302   //      events.
1303   base::PeriodicTask::Args snapshot_task_args;
1304   snapshot_task_args.start_first_task_immediately = true;
1305   snapshot_task_args.use_suspend_aware_timer =
1306       tracing_session->config.builtin_data_sources()
1307           .prefer_suspend_clock_for_snapshot();
1308   snapshot_task_args.task = [this, tsid] { PeriodicSnapshotTask(tsid); };
1309   snapshot_task_args.period_ms =
1310       tracing_session->config.builtin_data_sources().snapshot_interval_ms();
1311   if (!snapshot_task_args.period_ms)
1312     snapshot_task_args.period_ms = kDefaultSnapshotsIntervalMs;
1313   tracing_session->snapshot_periodic_task.Start(snapshot_task_args);
1314 
1315   // Trigger delayed task if the trace is time limited.
1316   const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
1317   if (trace_duration_ms > 0) {
1318     auto stop_task =
1319         std::bind(&TracingServiceImpl::StopOnDurationMsExpiry, this, tsid);
1320     if (tracing_session->config.prefer_suspend_clock_for_duration()) {
1321       base::PeriodicTask::Args stop_args;
1322       stop_args.use_suspend_aware_timer = true;
1323       stop_args.period_ms = trace_duration_ms;
1324       stop_args.one_shot = true;
1325       stop_args.task = std::move(stop_task);
1326       tracing_session->timed_stop_task.Start(stop_args);
1327     } else {
1328       weak_runner_.PostDelayedTask(std::move(stop_task), trace_duration_ms);
1329     }
1330   }  // if (trace_duration_ms > 0).
1331 
1332   // Start the periodic drain tasks if we should to save the trace into a file.
1333   if (tracing_session->config.write_into_file()) {
1334     weak_runner_.PostDelayedTask([this, tsid] { ReadBuffersIntoFile(tsid); },
1335                                  DelayToNextWritePeriodMs(*tracing_session));
1336   }
1337 
1338   // Start the periodic flush tasks if the config specified a flush period.
1339   if (tracing_session->config.flush_period_ms())
1340     PeriodicFlushTask(tsid, /*post_next_only=*/true);
1341 
1342   // Start the periodic incremental state clear tasks if the config specified a
1343   // period.
1344   if (tracing_session->config.incremental_state_config().clear_period_ms()) {
1345     PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
1346   }
1347 
1348   for (auto& [prod_id, data_source] : tracing_session->data_source_instances) {
1349     ProducerEndpointImpl* producer = GetProducer(prod_id);
1350     if (!producer) {
1351       PERFETTO_DFATAL("Producer does not exist.");
1352       continue;
1353     }
1354     StartDataSourceInstance(producer, tracing_session, &data_source);
1355   }
1356 
1357   MaybeNotifyAllDataSourcesStarted(tracing_session);
1358 
1359   // `did_notify_all_data_source_started` is only set if a consumer is
1360   // connected.
1361   if (tracing_session->consumer_maybe_null) {
1362     weak_runner_.PostDelayedTask(
1363         [this, tsid] { OnAllDataSourceStartedTimeout(tsid); },
1364         kAllDataSourceStartedTimeout);
1365   }
1366 }
1367 
StopOnDurationMsExpiry(TracingSessionID tsid)1368 void TracingServiceImpl::StopOnDurationMsExpiry(TracingSessionID tsid) {
1369   auto* tracing_session_ptr = GetTracingSession(tsid);
1370   if (!tracing_session_ptr)
1371     return;
1372   // If this trace was using STOP_TRACING triggers and we've seen
1373   // one, then the trigger overrides the normal timeout. In this
1374   // case we just return and let the other task clean up this trace.
1375   if (GetTriggerMode(tracing_session_ptr->config) ==
1376           TraceConfig::TriggerConfig::STOP_TRACING &&
1377       !tracing_session_ptr->received_triggers.empty())
1378     return;
1379   // In all other cases (START_TRACING or no triggers) we flush
1380   // after |trace_duration_ms| unconditionally.
1381   FlushAndDisableTracing(tsid);
1382 }
1383 
StartDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,TracingServiceImpl::DataSourceInstance * instance)1384 void TracingServiceImpl::StartDataSourceInstance(
1385     ProducerEndpointImpl* producer,
1386     TracingSession* tracing_session,
1387     TracingServiceImpl::DataSourceInstance* instance) {
1388   PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
1389 
1390   bool start_immediately = !instance->will_notify_on_start;
1391 
1392   if (producer->IsAndroidProcessFrozen()) {
1393     PERFETTO_DLOG(
1394         "skipping waiting of data source \"%s\" on producer \"%s\" (pid=%u) "
1395         "because it is frozen",
1396         instance->data_source_name.c_str(), producer->name_.c_str(),
1397         producer->pid());
1398     start_immediately = true;
1399   }
1400 
1401   if (!start_immediately) {
1402     instance->state = DataSourceInstance::STARTING;
1403   } else {
1404     instance->state = DataSourceInstance::STARTED;
1405   }
1406   if (tracing_session->consumer_maybe_null) {
1407     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1408         *producer, *instance);
1409   }
1410   producer->StartDataSource(instance->instance_id, instance->config);
1411 
1412   // If all data sources are started, notify the consumer.
1413   if (instance->state == DataSourceInstance::STARTED)
1414     MaybeNotifyAllDataSourcesStarted(tracing_session);
1415 }
1416 
1417 // DisableTracing just stops the data sources but doesn't free up any buffer.
1418 // This is to allow the consumer to freeze the buffers (by stopping the trace)
1419 // and then drain the buffers. The actual teardown of the TracingSession happens
1420 // in FreeBuffers().
DisableTracing(TracingSessionID tsid,bool disable_immediately)1421 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
1422                                         bool disable_immediately) {
1423   PERFETTO_DCHECK_THREAD(thread_checker_);
1424   TracingSession* tracing_session = GetTracingSession(tsid);
1425   if (!tracing_session) {
1426     // Can happen if the consumer calls this before EnableTracing() or after
1427     // FreeBuffers().
1428     PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
1429     return;
1430   }
1431 
1432   MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1433                       PerfettoStatsdAtom::kTracedDisableTracing);
1434 
1435   switch (tracing_session->state) {
1436     // Spurious call to DisableTracing() while already disabled, nothing to do.
1437     case TracingSession::DISABLED:
1438       PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1439       return;
1440 
1441     case TracingSession::CLONED_READ_ONLY:
1442       return;
1443 
1444     // This is either:
1445     // A) The case of a graceful DisableTracing() call followed by a call to
1446     //    FreeBuffers(), iff |disable_immediately| == true. In this case we want
1447     //    to forcefully transition in the disabled state without waiting for the
1448     //    outstanding acks because the buffers are going to be destroyed soon.
1449     // B) A spurious call, iff |disable_immediately| == false, in which case
1450     //    there is nothing to do.
1451     case TracingSession::DISABLING_WAITING_STOP_ACKS:
1452       PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1453       if (disable_immediately)
1454         DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1455       return;
1456 
1457     // Continues below.
1458     case TracingSession::CONFIGURED:
1459       // If the session didn't even start there is no need to orchestrate a
1460       // graceful stop of data sources.
1461       disable_immediately = true;
1462       break;
1463 
1464     // This is the nominal case, continues below.
1465     case TracingSession::STARTED:
1466       break;
1467   }
1468 
1469   for (auto& data_source_inst : tracing_session->data_source_instances) {
1470     const ProducerID producer_id = data_source_inst.first;
1471     DataSourceInstance& instance = data_source_inst.second;
1472     ProducerEndpointImpl* producer = GetProducer(producer_id);
1473     PERFETTO_DCHECK(producer);
1474     PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
1475                     instance.state == DataSourceInstance::STARTING ||
1476                     instance.state == DataSourceInstance::STARTED);
1477     StopDataSourceInstance(producer, tracing_session, &instance,
1478                            disable_immediately);
1479   }
1480 
1481   // If the periodic task is running, we can stop the periodic snapshot timer
1482   // here instead of waiting until FreeBuffers to prevent useless snapshots
1483   // which won't be read.
1484   tracing_session->snapshot_periodic_task.Reset();
1485 
1486   // Either this request is flagged with |disable_immediately| or there are no
1487   // data sources that are requesting a final handshake. In both cases just mark
1488   // the session as disabled immediately, notify the consumer and flush the
1489   // trace file (if used).
1490   if (tracing_session->AllDataSourceInstancesStopped())
1491     return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1492 
1493   tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
1494   weak_runner_.PostDelayedTask([this, tsid] { OnDisableTracingTimeout(tsid); },
1495                                tracing_session->data_source_stop_timeout_ms());
1496 
1497   // Deliberately NOT removing the session from |tracing_session_|, it's still
1498   // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
1499 }
1500 
NotifyDataSourceStarted(ProducerID producer_id,DataSourceInstanceID instance_id)1501 void TracingServiceImpl::NotifyDataSourceStarted(
1502     ProducerID producer_id,
1503     DataSourceInstanceID instance_id) {
1504   PERFETTO_DCHECK_THREAD(thread_checker_);
1505   for (auto& kv : tracing_sessions_) {
1506     TracingSession& tracing_session = kv.second;
1507     DataSourceInstance* instance =
1508         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1509 
1510     if (!instance)
1511       continue;
1512 
1513     // If the tracing session was already stopped, ignore this notification.
1514     if (tracing_session.state != TracingSession::STARTED)
1515       continue;
1516 
1517     if (instance->state != DataSourceInstance::STARTING) {
1518       PERFETTO_ELOG("Started data source instance in incorrect state: %d",
1519                     instance->state);
1520       continue;
1521     }
1522 
1523     instance->state = DataSourceInstance::STARTED;
1524 
1525     ProducerEndpointImpl* producer = GetProducer(producer_id);
1526     PERFETTO_DCHECK(producer);
1527     if (tracing_session.consumer_maybe_null) {
1528       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1529           *producer, *instance);
1530     }
1531 
1532     // If all data sources are started, notify the consumer.
1533     MaybeNotifyAllDataSourcesStarted(&tracing_session);
1534   }  // for (tracing_session)
1535 }
1536 
OnAllDataSourceStartedTimeout(TracingSessionID tsid)1537 void TracingServiceImpl::OnAllDataSourceStartedTimeout(TracingSessionID tsid) {
1538   PERFETTO_DCHECK_THREAD(thread_checker_);
1539   TracingSession* tracing_session = GetTracingSession(tsid);
1540   // It would be possible to check for `AllDataSourceInstancesStarted()` here,
1541   // but it doesn't make much sense, because a data source can be registered
1542   // after the session has started. Therefore this is tied to
1543   // `did_notify_all_data_source_started`: if that notification happened, do not
1544   // record slow data sources.
1545   if (!tracing_session || !tracing_session->consumer_maybe_null ||
1546       tracing_session->did_notify_all_data_source_started) {
1547     return;
1548   }
1549 
1550   int64_t timestamp = clock_->GetBootTimeNs().count();
1551 
1552   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
1553   packet->set_timestamp(static_cast<uint64_t>(timestamp));
1554   packet->set_trusted_uid(static_cast<int32_t>(uid_));
1555   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
1556 
1557   size_t i = 0;
1558   protos::pbzero::TracingServiceEvent::DataSources* slow_data_sources =
1559       packet->set_service_event()->set_slow_starting_data_sources();
1560   for (const auto& [producer_id, ds_instance] :
1561        tracing_session->data_source_instances) {
1562     if (ds_instance.state == DataSourceInstance::STARTED) {
1563       continue;
1564     }
1565     ProducerEndpointImpl* producer = GetProducer(producer_id);
1566     if (!producer) {
1567       continue;
1568     }
1569     if (++i > kMaxLifecycleEventsListedDataSources) {
1570       break;
1571     }
1572     auto* ds = slow_data_sources->add_data_source();
1573     ds->set_producer_name(producer->name_);
1574     ds->set_data_source_name(ds_instance.data_source_name);
1575     PERFETTO_LOG(
1576         "Data source failed to start within 20s data_source=\"%s\", "
1577         "producer=\"%s\", tsid=%" PRIu64,
1578         ds_instance.data_source_name.c_str(), producer->name_.c_str(), tsid);
1579   }
1580 
1581   tracing_session->slow_start_event = TracingSession::ArbitraryLifecycleEvent{
1582       timestamp, packet.SerializeAsArray()};
1583 }
1584 
MaybeNotifyAllDataSourcesStarted(TracingSession * tracing_session)1585 void TracingServiceImpl::MaybeNotifyAllDataSourcesStarted(
1586     TracingSession* tracing_session) {
1587   if (!tracing_session->consumer_maybe_null)
1588     return;
1589 
1590   if (!tracing_session->AllDataSourceInstancesStarted())
1591     return;
1592 
1593   // In some rare cases, we can get in this state more than once. Consider the
1594   // following scenario: 3 data sources are registered -> trace starts ->
1595   // all 3 data sources ack -> OnAllDataSourcesStarted() is called.
1596   // Imagine now that a 4th data source registers while the trace is ongoing.
1597   // This would hit the AllDataSourceInstancesStarted() condition again.
1598   // In this case, however, we don't want to re-notify the consumer again.
1599   // That would be unexpected (even if, perhaps, technically correct) and
1600   // trigger bugs in the consumer.
1601   if (tracing_session->did_notify_all_data_source_started)
1602     return;
1603 
1604   PERFETTO_DLOG("All data sources started");
1605 
1606   SnapshotLifecyleEvent(
1607       tracing_session,
1608       protos::pbzero::TracingServiceEvent::kAllDataSourcesStartedFieldNumber,
1609       true /* snapshot_clocks */);
1610 
1611   tracing_session->did_notify_all_data_source_started = true;
1612   tracing_session->consumer_maybe_null->OnAllDataSourcesStarted();
1613 }
1614 
NotifyDataSourceStopped(ProducerID producer_id,DataSourceInstanceID instance_id)1615 void TracingServiceImpl::NotifyDataSourceStopped(
1616     ProducerID producer_id,
1617     DataSourceInstanceID instance_id) {
1618   PERFETTO_DCHECK_THREAD(thread_checker_);
1619   for (auto& kv : tracing_sessions_) {
1620     TracingSession& tracing_session = kv.second;
1621     DataSourceInstance* instance =
1622         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1623 
1624     if (!instance)
1625       continue;
1626 
1627     if (instance->state != DataSourceInstance::STOPPING) {
1628       PERFETTO_ELOG("Stopped data source instance in incorrect state: %d",
1629                     instance->state);
1630       continue;
1631     }
1632 
1633     instance->state = DataSourceInstance::STOPPED;
1634 
1635     ProducerEndpointImpl* producer = GetProducer(producer_id);
1636     PERFETTO_DCHECK(producer);
1637     if (tracing_session.consumer_maybe_null) {
1638       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1639           *producer, *instance);
1640     }
1641 
1642     if (!tracing_session.AllDataSourceInstancesStopped())
1643       continue;
1644 
1645     if (tracing_session.state != TracingSession::DISABLING_WAITING_STOP_ACKS)
1646       continue;
1647 
1648     // All data sources acked the termination.
1649     DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
1650   }  // for (tracing_session)
1651 }
1652 
ActivateTriggers(ProducerID producer_id,const std::vector<std::string> & triggers)1653 void TracingServiceImpl::ActivateTriggers(
1654     ProducerID producer_id,
1655     const std::vector<std::string>& triggers) {
1656   PERFETTO_DCHECK_THREAD(thread_checker_);
1657   auto* producer = GetProducer(producer_id);
1658   PERFETTO_DCHECK(producer);
1659 
1660   int64_t now_ns = clock_->GetBootTimeNs().count();
1661   for (const auto& trigger_name : triggers) {
1662     PERFETTO_DLOG("Received ActivateTriggers request for \"%s\"",
1663                   trigger_name.c_str());
1664     android_stats::MaybeLogTriggerEvent(PerfettoTriggerAtom::kTracedTrigger,
1665                                         trigger_name);
1666 
1667     base::Hasher hash;
1668     hash.Update(trigger_name.c_str(), trigger_name.size());
1669     std::string triggered_session_name;
1670     base::Uuid triggered_session_uuid;
1671     TracingSessionID triggered_session_id = 0;
1672     auto trigger_mode = TraceConfig::TriggerConfig::UNSPECIFIED;
1673 
1674     uint64_t trigger_name_hash = hash.digest();
1675     size_t count_in_window =
1676         PurgeExpiredAndCountTriggerInWindow(now_ns, trigger_name_hash);
1677 
1678     bool trigger_matched = false;
1679     bool trigger_activated = false;
1680     for (auto& id_and_tracing_session : tracing_sessions_) {
1681       auto& tracing_session = id_and_tracing_session.second;
1682       TracingSessionID tsid = id_and_tracing_session.first;
1683       auto iter = std::find_if(
1684           tracing_session.config.trigger_config().triggers().begin(),
1685           tracing_session.config.trigger_config().triggers().end(),
1686           [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
1687             return trigger.name() == trigger_name;
1688           });
1689       if (iter == tracing_session.config.trigger_config().triggers().end())
1690         continue;
1691       if (tracing_session.state == TracingSession::CLONED_READ_ONLY)
1692         continue;
1693 
1694       // If this trigger requires a certain producer to have sent it
1695       // (non-empty producer_name()) ensure the producer who sent this trigger
1696       // matches.
1697       if (!iter->producer_name_regex().empty() &&
1698           !std::regex_match(
1699               producer->name_,
1700               std::regex(iter->producer_name_regex(), std::regex::extended))) {
1701         continue;
1702       }
1703 
1704       // Use a random number between 0 and 1 to check if we should allow this
1705       // trigger through or not.
1706       double trigger_rnd = random_->GetValue();
1707       PERFETTO_DCHECK(trigger_rnd >= 0 && trigger_rnd < 1);
1708       if (trigger_rnd < iter->skip_probability()) {
1709         MaybeLogTriggerEvent(tracing_session.config,
1710                              PerfettoTriggerAtom::kTracedLimitProbability,
1711                              trigger_name);
1712         continue;
1713       }
1714 
1715       // If we already triggered more times than the limit, silently ignore
1716       // this trigger.
1717       if (iter->max_per_24_h() > 0 && count_in_window >= iter->max_per_24_h()) {
1718         MaybeLogTriggerEvent(tracing_session.config,
1719                              PerfettoTriggerAtom::kTracedLimitMaxPer24h,
1720                              trigger_name);
1721         continue;
1722       }
1723       trigger_matched = true;
1724       triggered_session_id = tracing_session.id;
1725       triggered_session_name = tracing_session.config.unique_session_name();
1726       triggered_session_uuid.set_lsb_msb(tracing_session.trace_uuid.lsb(),
1727                                          tracing_session.trace_uuid.msb());
1728       trigger_mode = GetTriggerMode(tracing_session.config);
1729 
1730       const bool triggers_already_received =
1731           !tracing_session.received_triggers.empty();
1732       const TriggerInfo trigger = {static_cast<uint64_t>(now_ns), iter->name(),
1733                                    producer->name_, producer->uid()};
1734       tracing_session.received_triggers.push_back(trigger);
1735       switch (trigger_mode) {
1736         case TraceConfig::TriggerConfig::START_TRACING:
1737           // If the session has already been triggered and moved past
1738           // CONFIGURED then we don't need to repeat StartTracing. This would
1739           // work fine (StartTracing would return false) but would add error
1740           // logs.
1741           if (tracing_session.state != TracingSession::CONFIGURED)
1742             break;
1743 
1744           trigger_activated = true;
1745           MaybeLogUploadEvent(
1746               tracing_session.config, tracing_session.trace_uuid,
1747               PerfettoStatsdAtom::kTracedTriggerStartTracing, iter->name());
1748 
1749           // We override the trace duration to be the trigger's requested
1750           // value, this ensures that the trace will end after this amount
1751           // of time has passed.
1752           tracing_session.config.set_duration_ms(iter->stop_delay_ms());
1753           StartTracing(tsid);
1754           break;
1755         case TraceConfig::TriggerConfig::STOP_TRACING:
1756           // Only stop the trace once to avoid confusing log messages. I.E.
1757           // when we've already hit the first trigger we've already Posted the
1758           // task to FlushAndDisable. So all future triggers will just break
1759           // out.
1760           if (triggers_already_received)
1761             break;
1762 
1763           trigger_activated = true;
1764           MaybeLogUploadEvent(
1765               tracing_session.config, tracing_session.trace_uuid,
1766               PerfettoStatsdAtom::kTracedTriggerStopTracing, iter->name());
1767 
1768           // Now that we've seen a trigger we need to stop, flush, and disable
1769           // this session after the configured |stop_delay_ms|.
1770           weak_runner_.PostDelayedTask(
1771               [this, tsid] {
1772                 // Skip entirely the flush if the trace session doesn't exist
1773                 // anymore. This is to prevent misleading error messages to be
1774                 // logged.
1775                 if (GetTracingSession(tsid))
1776                   FlushAndDisableTracing(tsid);
1777               },
1778               // If this trigger is zero this will immediately executable and
1779               // will happen shortly.
1780               iter->stop_delay_ms());
1781           break;
1782 
1783         case TraceConfig::TriggerConfig::CLONE_SNAPSHOT:
1784           trigger_activated = true;
1785           MaybeLogUploadEvent(
1786               tracing_session.config, tracing_session.trace_uuid,
1787               PerfettoStatsdAtom::kTracedTriggerCloneSnapshot, iter->name());
1788           weak_runner_.PostDelayedTask(
1789               [this, tsid, trigger] {
1790                 auto* tsess = GetTracingSession(tsid);
1791                 if (!tsess || !tsess->consumer_maybe_null)
1792                   return;
1793                 tsess->consumer_maybe_null->NotifyCloneSnapshotTrigger(trigger);
1794               },
1795               iter->stop_delay_ms());
1796           break;
1797 
1798         case TraceConfig::TriggerConfig::UNSPECIFIED:
1799           PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
1800           break;
1801       }
1802     }  // for (.. : tracing_sessions_)
1803 
1804     if (trigger_matched) {
1805       trigger_history_.emplace_back(TriggerHistory{now_ns, trigger_name_hash});
1806     }
1807 
1808     if (trigger_activated) {
1809       // Log only the trigger that actually caused a trace stop/start, don't log
1810       // the follow-up ones, even if they matched.
1811       PERFETTO_LOG(
1812           "Trace trigger activated: trigger_name=\"%s\" trigger_mode=%d "
1813           "trace_name=\"%s\" trace_uuid=\"%s\" tsid=%" PRIu64,
1814           trigger_name.c_str(), trigger_mode, triggered_session_name.c_str(),
1815           triggered_session_uuid.ToPrettyString().c_str(),
1816           triggered_session_id);
1817     }
1818   }  // for (trigger_name : triggers)
1819 }
1820 
1821 // Always invoked TraceConfig.data_source_stop_timeout_ms (by default
1822 // kDataSourceStopTimeoutMs) after DisableTracing(). In nominal conditions all
1823 // data sources should have acked the stop and this will early out.
OnDisableTracingTimeout(TracingSessionID tsid)1824 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
1825   PERFETTO_DCHECK_THREAD(thread_checker_);
1826   TracingSession* tracing_session = GetTracingSession(tsid);
1827   if (!tracing_session ||
1828       tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
1829     return;  // Tracing session was successfully disabled.
1830   }
1831 
1832   PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
1833                 tsid);
1834   PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1835   DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1836 }
1837 
DisableTracingNotifyConsumerAndFlushFile(TracingSession * tracing_session)1838 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
1839     TracingSession* tracing_session) {
1840   PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
1841   for (auto& inst_kv : tracing_session->data_source_instances) {
1842     if (inst_kv.second.state == DataSourceInstance::STOPPED)
1843       continue;
1844     inst_kv.second.state = DataSourceInstance::STOPPED;
1845     ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
1846     PERFETTO_DCHECK(producer);
1847     if (tracing_session->consumer_maybe_null) {
1848       tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1849           *producer, inst_kv.second);
1850     }
1851   }
1852   tracing_session->state = TracingSession::DISABLED;
1853 
1854   // Scrape any remaining chunks that weren't flushed by the producers.
1855   for (auto& producer_id_and_producer : producers_)
1856     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1857 
1858   SnapshotLifecyleEvent(
1859       tracing_session,
1860       protos::pbzero::TracingServiceEvent::kTracingDisabledFieldNumber,
1861       true /* snapshot_clocks */);
1862 
1863   if (tracing_session->write_into_file) {
1864     tracing_session->write_period_ms = 0;
1865     ReadBuffersIntoFile(tracing_session->id);
1866   }
1867 
1868   MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1869                       PerfettoStatsdAtom::kTracedNotifyTracingDisabled);
1870 
1871   if (tracing_session->consumer_maybe_null)
1872     tracing_session->consumer_maybe_null->NotifyOnTracingDisabled("");
1873 }
1874 
Flush(TracingSessionID tsid,uint32_t timeout_ms,ConsumerEndpoint::FlushCallback callback,FlushFlags flush_flags)1875 void TracingServiceImpl::Flush(TracingSessionID tsid,
1876                                uint32_t timeout_ms,
1877                                ConsumerEndpoint::FlushCallback callback,
1878                                FlushFlags flush_flags) {
1879   PERFETTO_DCHECK_THREAD(thread_checker_);
1880   TracingSession* tracing_session = GetTracingSession(tsid);
1881   if (!tracing_session) {
1882     PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
1883     return;
1884   }
1885 
1886   SnapshotLifecyleEvent(
1887       tracing_session,
1888       protos::pbzero::TracingServiceEvent::kFlushStartedFieldNumber,
1889       false /* snapshot_clocks */);
1890 
1891   std::map<ProducerID, std::vector<DataSourceInstanceID>> data_source_instances;
1892   for (const auto& [producer_id, ds_inst] :
1893        tracing_session->data_source_instances) {
1894     if (ds_inst.no_flush)
1895       continue;
1896     data_source_instances[producer_id].push_back(ds_inst.instance_id);
1897   }
1898   FlushDataSourceInstances(tracing_session, timeout_ms, data_source_instances,
1899                            std::move(callback), flush_flags);
1900 }
1901 
FlushDataSourceInstances(TracingSession * tracing_session,uint32_t timeout_ms,const std::map<ProducerID,std::vector<DataSourceInstanceID>> & data_source_instances,ConsumerEndpoint::FlushCallback callback,FlushFlags flush_flags)1902 void TracingServiceImpl::FlushDataSourceInstances(
1903     TracingSession* tracing_session,
1904     uint32_t timeout_ms,
1905     const std::map<ProducerID, std::vector<DataSourceInstanceID>>&
1906         data_source_instances,
1907     ConsumerEndpoint::FlushCallback callback,
1908     FlushFlags flush_flags) {
1909   PERFETTO_DCHECK_THREAD(thread_checker_);
1910   if (!timeout_ms)
1911     timeout_ms = tracing_session->flush_timeout_ms();
1912 
1913   if (tracing_session->pending_flushes.size() > 1000) {
1914     PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
1915                   tracing_session->pending_flushes.size());
1916     callback(false);
1917     return;
1918   }
1919 
1920   if (tracing_session->state != TracingSession::STARTED) {
1921     PERFETTO_LOG("Flush() called, but tracing has not been started");
1922     callback(false);
1923     return;
1924   }
1925 
1926   tracing_session->last_flush_events.clear();
1927 
1928   ++tracing_session->flushes_requested;
1929   FlushRequestID flush_request_id = ++last_flush_request_id_;
1930   PendingFlush& pending_flush =
1931       tracing_session->pending_flushes
1932           .emplace_hint(tracing_session->pending_flushes.end(),
1933                         flush_request_id, PendingFlush(std::move(callback)))
1934           ->second;
1935 
1936   // Send a flush request to each producer involved in the tracing session. In
1937   // order to issue a flush request we have to build a map of all data source
1938   // instance ids enabled for each producer.
1939 
1940   for (const auto& [producer_id, data_sources] : data_source_instances) {
1941     ProducerEndpointImpl* producer = GetProducer(producer_id);
1942     producer->Flush(flush_request_id, data_sources, flush_flags);
1943     pending_flush.producers.insert(producer_id);
1944   }
1945 
1946   // If there are no producers to flush (realistically this happens only in
1947   // some tests) fire OnFlushTimeout() straight away, without waiting.
1948   if (data_source_instances.empty())
1949     timeout_ms = 0;
1950 
1951   weak_runner_.PostDelayedTask(
1952       [this, tsid = tracing_session->id, flush_request_id, flush_flags] {
1953         OnFlushTimeout(tsid, flush_request_id, flush_flags);
1954       },
1955       timeout_ms);
1956 }
1957 
NotifyFlushDoneForProducer(ProducerID producer_id,FlushRequestID flush_request_id)1958 void TracingServiceImpl::NotifyFlushDoneForProducer(
1959     ProducerID producer_id,
1960     FlushRequestID flush_request_id) {
1961   for (auto& kv : tracing_sessions_) {
1962     // Remove all pending flushes <= |flush_request_id| for |producer_id|.
1963     auto& pending_flushes = kv.second.pending_flushes;
1964     auto end_it = pending_flushes.upper_bound(flush_request_id);
1965     for (auto it = pending_flushes.begin(); it != end_it;) {
1966       PendingFlush& pending_flush = it->second;
1967       pending_flush.producers.erase(producer_id);
1968       if (pending_flush.producers.empty()) {
1969         TracingSessionID tsid = kv.first;
1970         auto callback = std::move(pending_flush.callback);
1971         weak_runner_.PostTask([this, tsid, callback = std::move(callback)]() {
1972           CompleteFlush(tsid, std::move(callback),
1973                         /*success=*/true);
1974         });
1975         it = pending_flushes.erase(it);
1976       } else {
1977         it++;
1978       }
1979     }  // for (pending_flushes)
1980   }    // for (tracing_session)
1981 }
1982 
OnFlushTimeout(TracingSessionID tsid,FlushRequestID flush_request_id,FlushFlags flush_flags)1983 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
1984                                         FlushRequestID flush_request_id,
1985                                         FlushFlags flush_flags) {
1986   TracingSession* tracing_session = GetTracingSession(tsid);
1987   if (!tracing_session)
1988     return;
1989   auto it = tracing_session->pending_flushes.find(flush_request_id);
1990   if (it == tracing_session->pending_flushes.end())
1991     return;  // Nominal case: flush was completed and acked on time.
1992 
1993   PendingFlush& pending_flush = it->second;
1994 
1995   // If there were no producers to flush, consider it a success.
1996   bool success = pending_flush.producers.empty();
1997   auto callback = std::move(pending_flush.callback);
1998   // If flush failed and this is a "final" flush, log which data sources were
1999   // slow.
2000   if ((flush_flags.reason() == FlushFlags::Reason::kTraceClone ||
2001        flush_flags.reason() == FlushFlags::Reason::kTraceStop) &&
2002       !success) {
2003     int64_t timestamp = clock_->GetBootTimeNs().count();
2004 
2005     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2006     packet->set_timestamp(static_cast<uint64_t>(timestamp));
2007     packet->set_trusted_uid(static_cast<int32_t>(uid_));
2008     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2009 
2010     size_t i = 0;
2011     protos::pbzero::TracingServiceEvent::DataSources* event =
2012         packet->set_service_event()->set_last_flush_slow_data_sources();
2013     for (const auto& producer_id : pending_flush.producers) {
2014       ProducerEndpointImpl* producer = GetProducer(producer_id);
2015       if (!producer) {
2016         continue;
2017       }
2018       if (++i > kMaxLifecycleEventsListedDataSources) {
2019         break;
2020       }
2021 
2022       auto ds_id_range =
2023           tracing_session->data_source_instances.equal_range(producer_id);
2024       for (auto ds_it = ds_id_range.first; ds_it != ds_id_range.second;
2025            ds_it++) {
2026         auto* ds = event->add_data_source();
2027         ds->set_producer_name(producer->name_);
2028         ds->set_data_source_name(ds_it->second.data_source_name);
2029         if (++i > kMaxLifecycleEventsListedDataSources) {
2030           break;
2031         }
2032       }
2033     }
2034 
2035     tracing_session->last_flush_events.push_back(
2036         {timestamp, packet.SerializeAsArray()});
2037   }
2038   tracing_session->pending_flushes.erase(it);
2039   CompleteFlush(tsid, std::move(callback), success);
2040 }
2041 
CompleteFlush(TracingSessionID tsid,ConsumerEndpoint::FlushCallback callback,bool success)2042 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
2043                                        ConsumerEndpoint::FlushCallback callback,
2044                                        bool success) {
2045   TracingSession* tracing_session = GetTracingSession(tsid);
2046   if (!tracing_session) {
2047     callback(false);
2048     return;
2049   }
2050   // Producers may not have been able to flush all their data, even if they
2051   // indicated flush completion. If possible, also collect uncommitted chunks
2052   // to make sure we have everything they wrote so far.
2053   for (auto& producer_id_and_producer : producers_) {
2054     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
2055   }
2056   SnapshotLifecyleEvent(
2057       tracing_session,
2058       protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
2059       true /* snapshot_clocks */);
2060 
2061   tracing_session->flushes_succeeded += success ? 1 : 0;
2062   tracing_session->flushes_failed += success ? 0 : 1;
2063   callback(success);
2064 }
2065 
ScrapeSharedMemoryBuffers(TracingSession * tracing_session,ProducerEndpointImpl * producer)2066 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
2067     TracingSession* tracing_session,
2068     ProducerEndpointImpl* producer) {
2069   if (!producer->smb_scraping_enabled_)
2070     return;
2071 
2072   // Can't copy chunks if we don't know about any trace writers.
2073   if (producer->writers_.empty())
2074     return;
2075 
2076   // Performance optimization: On flush or session disconnect, this method is
2077   // called for each producer. If the producer doesn't participate in the
2078   // session, there's no need to scape its chunks right now. We can tell if a
2079   // producer participates in the session by checking if the producer is allowed
2080   // to write into the session's log buffers.
2081   const auto& session_buffers = tracing_session->buffers_index;
2082   bool producer_in_session =
2083       std::any_of(session_buffers.begin(), session_buffers.end(),
2084                   [producer](BufferID buffer_id) {
2085                     return producer->allowed_target_buffers_.count(buffer_id);
2086                   });
2087   if (!producer_in_session)
2088     return;
2089 
2090   PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
2091 
2092   // Find and copy any uncommitted chunks from the SMB.
2093   //
2094   // In nominal conditions, the page header bitmap of the used SMB pages should
2095   // never change because the service is the only one who is supposed to modify
2096   // used pages (to make them free again).
2097   //
2098   // However, the code here needs to deal with the case of a malicious producer
2099   // altering the SMB in unpredictable ways. Thankfully the SMB size is
2100   // immutable, so a chunk will always point to some valid memory, even if the
2101   // producer alters the intended layout and chunk header concurrently.
2102   // Ultimately a malicious producer altering the SMB's chunk header bitamp
2103   // while we are iterating in this function is not any different from the case
2104   // of a malicious producer asking to commit a chunk made of random data,
2105   // which is something this class has to deal with regardless.
2106   //
2107   // The only legitimate mutations that can happen from sane producers,
2108   // concurrently to this function, are:
2109   //   A. free pages being partitioned,
2110   //   B. free chunks being migrated to kChunkBeingWritten,
2111   //   C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
2112 
2113   SharedMemoryABI* abi = &producer->shmem_abi_;
2114   // num_pages() is immutable after the SMB is initialized and cannot be changed
2115   // even by a producer even if malicious.
2116   for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
2117     uint32_t header_bitmap = abi->GetPageHeaderBitmap(page_idx);
2118 
2119     uint32_t used_chunks =
2120         abi->GetUsedChunks(header_bitmap);  // Returns a bitmap.
2121     // Skip empty pages.
2122     if (used_chunks == 0)
2123       continue;
2124 
2125     // Scrape the chunks that are currently used. These should be either in
2126     // state kChunkBeingWritten or kChunkComplete.
2127     for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
2128       if (!(used_chunks & 1))
2129         continue;
2130 
2131       SharedMemoryABI::ChunkState state =
2132           SharedMemoryABI::GetChunkStateFromHeaderBitmap(header_bitmap,
2133                                                          chunk_idx);
2134       PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
2135                       state == SharedMemoryABI::kChunkComplete);
2136       bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
2137 
2138       SharedMemoryABI::Chunk chunk =
2139           abi->GetChunkUnchecked(page_idx, header_bitmap, chunk_idx);
2140 
2141       uint16_t packet_count;
2142       uint8_t flags;
2143       // GetPacketCountAndFlags has acquire_load semantics.
2144       std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
2145 
2146       // It only makes sense to copy an incomplete chunk if there's at least
2147       // one full packet available. (The producer may not have completed the
2148       // last packet in it yet, so we need at least 2.)
2149       if (!chunk_complete && packet_count < 2)
2150         continue;
2151 
2152       // At this point, it is safe to access the remaining header fields of
2153       // the chunk. Even if the chunk was only just transferred from
2154       // kChunkFree into kChunkBeingWritten state, the header should be
2155       // written completely once the packet count increased above 1 (it was
2156       // reset to 0 by the service when the chunk was freed).
2157 
2158       WriterID writer_id = chunk.writer_id();
2159       std::optional<BufferID> target_buffer_id =
2160           producer->buffer_id_for_writer(writer_id);
2161 
2162       // We can only scrape this chunk if we know which log buffer to copy it
2163       // into.
2164       if (!target_buffer_id)
2165         continue;
2166 
2167       // Skip chunks that don't belong to the requested tracing session.
2168       bool target_buffer_belongs_to_session =
2169           std::find(session_buffers.begin(), session_buffers.end(),
2170                     *target_buffer_id) != session_buffers.end();
2171       if (!target_buffer_belongs_to_session)
2172         continue;
2173 
2174       uint32_t chunk_id =
2175           chunk.header()->chunk_id.load(std::memory_order_relaxed);
2176 
2177       CopyProducerPageIntoLogBuffer(
2178           producer->id_, producer->client_identity_, writer_id, chunk_id,
2179           *target_buffer_id, packet_count, flags, chunk_complete,
2180           chunk.payload_begin(), chunk.payload_size());
2181     }
2182   }
2183 }
2184 
FlushAndDisableTracing(TracingSessionID tsid)2185 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
2186   PERFETTO_DCHECK_THREAD(thread_checker_);
2187   PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
2188   Flush(
2189       tsid, 0,
2190       [this, tsid](bool success) {
2191         // This was a DLOG up to Jun 2021 (v16, Android S).
2192         PERFETTO_LOG("FlushAndDisableTracing(%" PRIu64 ") done, success=%d",
2193                      tsid, success);
2194         TracingSession* session = GetTracingSession(tsid);
2195         if (!session) {
2196           return;
2197         }
2198         session->final_flush_outcome = success
2199                                            ? TraceStats::FINAL_FLUSH_SUCCEEDED
2200                                            : TraceStats::FINAL_FLUSH_FAILED;
2201         if (session->consumer_maybe_null) {
2202           // If the consumer is still attached, just disable the session but
2203           // give it a chance to read the contents.
2204           DisableTracing(tsid);
2205         } else {
2206           // If the consumer detached, destroy the session. If the consumer did
2207           // start the session in long-tracing mode, the service will have saved
2208           // the contents to the passed file. If not, the contents will be
2209           // destroyed.
2210           FreeBuffers(tsid);
2211         }
2212       },
2213       FlushFlags(FlushFlags::Initiator::kTraced,
2214                  FlushFlags::Reason::kTraceStop));
2215 }
2216 
PeriodicFlushTask(TracingSessionID tsid,bool post_next_only)2217 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
2218                                            bool post_next_only) {
2219   PERFETTO_DCHECK_THREAD(thread_checker_);
2220   TracingSession* tracing_session = GetTracingSession(tsid);
2221   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
2222     return;
2223 
2224   uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
2225   weak_runner_.PostDelayedTask(
2226       [this, tsid] { PeriodicFlushTask(tsid, /*post_next_only=*/false); },
2227       flush_period_ms - static_cast<uint32_t>(clock_->GetWallTimeMs().count() %
2228                                               flush_period_ms));
2229 
2230   if (post_next_only)
2231     return;
2232 
2233   PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
2234   Flush(
2235       tsid, 0,
2236       [](bool success) {
2237         if (!success)
2238           PERFETTO_ELOG("Periodic flush timed out");
2239       },
2240       FlushFlags(FlushFlags::Initiator::kTraced,
2241                  FlushFlags::Reason::kPeriodic));
2242 }
2243 
PeriodicClearIncrementalStateTask(TracingSessionID tsid,bool post_next_only)2244 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
2245     TracingSessionID tsid,
2246     bool post_next_only) {
2247   PERFETTO_DCHECK_THREAD(thread_checker_);
2248   TracingSession* tracing_session = GetTracingSession(tsid);
2249   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
2250     return;
2251 
2252   uint32_t clear_period_ms =
2253       tracing_session->config.incremental_state_config().clear_period_ms();
2254   weak_runner_.PostDelayedTask(
2255       [this, tsid] {
2256         PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/false);
2257       },
2258       clear_period_ms - static_cast<uint32_t>(clock_->GetWallTimeMs().count() %
2259                                               clear_period_ms));
2260 
2261   if (post_next_only)
2262     return;
2263 
2264   PERFETTO_DLOG(
2265       "Performing periodic incremental state clear for trace session %" PRIu64,
2266       tsid);
2267 
2268   // Queue the IPCs to producers with active data sources that opted in.
2269   std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
2270   for (const auto& kv : tracing_session->data_source_instances) {
2271     ProducerID producer_id = kv.first;
2272     const DataSourceInstance& data_source = kv.second;
2273     if (data_source.handles_incremental_state_clear) {
2274       clear_map[producer_id].push_back(data_source.instance_id);
2275     }
2276   }
2277 
2278   for (const auto& kv : clear_map) {
2279     ProducerID producer_id = kv.first;
2280     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
2281     ProducerEndpointImpl* producer = GetProducer(producer_id);
2282     if (!producer) {
2283       PERFETTO_DFATAL("Producer does not exist.");
2284       continue;
2285     }
2286     producer->ClearIncrementalState(data_sources);
2287   }
2288 }
2289 
ReadBuffersIntoConsumer(TracingSessionID tsid,ConsumerEndpointImpl * consumer)2290 bool TracingServiceImpl::ReadBuffersIntoConsumer(
2291     TracingSessionID tsid,
2292     ConsumerEndpointImpl* consumer) {
2293   PERFETTO_DCHECK(consumer);
2294   PERFETTO_DCHECK_THREAD(thread_checker_);
2295   TracingSession* tracing_session = GetTracingSession(tsid);
2296   if (!tracing_session) {
2297     PERFETTO_DLOG(
2298         "Cannot ReadBuffersIntoConsumer(): no tracing session is active");
2299     return false;
2300   }
2301 
2302   if (tracing_session->write_into_file) {
2303     // If the consumer enabled tracing and asked to save the contents into the
2304     // passed file makes little sense to also try to read the buffers over IPC,
2305     // as that would just steal data from the periodic draining task.
2306     PERFETTO_ELOG("Consumer trying to read from write_into_file session.");
2307     return false;
2308   }
2309 
2310   if (IsWaitingForTrigger(tracing_session))
2311     return false;
2312 
2313   // This is a rough threshold to determine how much to read from the buffer in
2314   // each task. This is to avoid executing a single huge sending task for too
2315   // long and risk to hit the watchdog. This is *not* an upper bound: we just
2316   // stop accumulating new packets and PostTask *after* we cross this threshold.
2317   // This constant essentially balances the PostTask and IPC overhead vs the
2318   // responsiveness of the service. An extremely small value will cause one IPC
2319   // and one PostTask for each slice but will keep the service extremely
2320   // responsive. An extremely large value will batch the send for the full
2321   // buffer in one large task, will hit the blocking send() once the socket
2322   // buffers are full and hang the service for a bit (until the consumer
2323   // catches up).
2324   static constexpr size_t kApproxBytesPerTask = 32768;
2325   bool has_more;
2326   std::vector<TracePacket> packets =
2327       ReadBuffers(tracing_session, kApproxBytesPerTask, &has_more);
2328 
2329   if (has_more) {
2330     auto weak_consumer = consumer->weak_ptr_factory_.GetWeakPtr();
2331     weak_runner_.PostTask(
2332         [this, weak_consumer = std::move(weak_consumer), tsid] {
2333           if (!weak_consumer)
2334             return;
2335           ReadBuffersIntoConsumer(tsid, weak_consumer.get());
2336         });
2337   }
2338 
2339   // Keep this as tail call, just in case the consumer re-enters.
2340   consumer->consumer_->OnTraceData(std::move(packets), has_more);
2341   return true;
2342 }
2343 
ReadBuffersIntoFile(TracingSessionID tsid)2344 bool TracingServiceImpl::ReadBuffersIntoFile(TracingSessionID tsid) {
2345   PERFETTO_DCHECK_THREAD(thread_checker_);
2346   TracingSession* tracing_session = GetTracingSession(tsid);
2347   if (!tracing_session) {
2348     // This will be hit systematically from the PostDelayedTask. Avoid logging,
2349     // it would be just spam.
2350     return false;
2351   }
2352 
2353   // This can happen if the file is closed by a previous task because it reaches
2354   // |max_file_size_bytes|.
2355   if (!tracing_session->write_into_file)
2356     return false;
2357 
2358   if (IsWaitingForTrigger(tracing_session))
2359     return false;
2360 
2361   // ReadBuffers() can allocate memory internally, for filtering. By limiting
2362   // the data that ReadBuffers() reads to kWriteIntoChunksSize per iteration,
2363   // we limit the amount of memory used on each iteration.
2364   //
2365   // It would be tempting to split this into multiple tasks like in
2366   // ReadBuffersIntoConsumer, but that's not currently possible.
2367   // ReadBuffersIntoFile has to read the whole available data before returning,
2368   // to support the disable_immediately=true code paths.
2369   bool has_more = true;
2370   bool stop_writing_into_file = false;
2371   do {
2372     std::vector<TracePacket> packets =
2373         ReadBuffers(tracing_session, kWriteIntoFileChunkSize, &has_more);
2374 
2375     stop_writing_into_file = WriteIntoFile(tracing_session, std::move(packets));
2376   } while (has_more && !stop_writing_into_file);
2377 
2378   if (stop_writing_into_file || tracing_session->write_period_ms == 0) {
2379     // Ensure all data was written to the file before we close it.
2380     base::FlushFile(tracing_session->write_into_file.get());
2381     tracing_session->write_into_file.reset();
2382     tracing_session->write_period_ms = 0;
2383     if (tracing_session->state == TracingSession::STARTED)
2384       DisableTracing(tsid);
2385     return true;
2386   }
2387 
2388   weak_runner_.PostDelayedTask([this, tsid] { ReadBuffersIntoFile(tsid); },
2389                                DelayToNextWritePeriodMs(*tracing_session));
2390   return true;
2391 }
2392 
IsWaitingForTrigger(TracingSession * tracing_session)2393 bool TracingServiceImpl::IsWaitingForTrigger(TracingSession* tracing_session) {
2394   // Ignore the logic below for cloned tracing sessions. In this case we
2395   // actually want to read the (cloned) trace buffers even if no trigger was
2396   // hit.
2397   if (tracing_session->state == TracingSession::CLONED_READ_ONLY) {
2398     return false;
2399   }
2400 
2401   // When a tracing session is waiting for a trigger, it is considered empty. If
2402   // a tracing session finishes and moves into DISABLED without ever receiving a
2403   // trigger, the trace should never return any data. This includes the
2404   // synthetic packets like TraceConfig and Clock snapshots. So we bail out
2405   // early and let the consumer know there is no data.
2406   if (!tracing_session->config.trigger_config().triggers().empty() &&
2407       tracing_session->received_triggers.empty()) {
2408     PERFETTO_DLOG(
2409         "ReadBuffers(): tracing session has not received a trigger yet.");
2410     return true;
2411   }
2412 
2413   // Traces with CLONE_SNAPSHOT triggers are a special case of the above. They
2414   // can be read only via a CloneSession() request. This is to keep the
2415   // behavior consistent with the STOP_TRACING+triggers case and avoid periodic
2416   // finalizations and uploads of the main CLONE_SNAPSHOT triggers.
2417   if (GetTriggerMode(tracing_session->config) ==
2418       TraceConfig::TriggerConfig::CLONE_SNAPSHOT) {
2419     PERFETTO_DLOG(
2420         "ReadBuffers(): skipping because the tracing session has "
2421         "CLONE_SNAPSHOT triggers defined");
2422     return true;
2423   }
2424 
2425   return false;
2426 }
2427 
ReadBuffers(TracingSession * tracing_session,size_t threshold,bool * has_more)2428 std::vector<TracePacket> TracingServiceImpl::ReadBuffers(
2429     TracingSession* tracing_session,
2430     size_t threshold,
2431     bool* has_more) {
2432   PERFETTO_DCHECK_THREAD(thread_checker_);
2433   PERFETTO_DCHECK(tracing_session);
2434   *has_more = false;
2435 
2436   std::vector<TracePacket> packets;
2437   packets.reserve(1024);  // Just an educated guess to avoid trivial expansions.
2438 
2439   if (!tracing_session->initial_clock_snapshot.empty()) {
2440     EmitClockSnapshot(tracing_session,
2441                       std::move(tracing_session->initial_clock_snapshot),
2442                       &packets);
2443   }
2444 
2445   for (auto& snapshot : tracing_session->clock_snapshot_ring_buffer) {
2446     PERFETTO_DCHECK(!snapshot.empty());
2447     EmitClockSnapshot(tracing_session, std::move(snapshot), &packets);
2448   }
2449   tracing_session->clock_snapshot_ring_buffer.clear();
2450 
2451   if (tracing_session->should_emit_sync_marker) {
2452     EmitSyncMarker(&packets);
2453     tracing_session->should_emit_sync_marker = false;
2454   }
2455 
2456   if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
2457     MaybeEmitTraceConfig(tracing_session, &packets);
2458     MaybeEmitCloneTrigger(tracing_session, &packets);
2459     MaybeEmitReceivedTriggers(tracing_session, &packets);
2460   }
2461   if (!tracing_session->did_emit_initial_packets) {
2462     EmitUuid(tracing_session, &packets);
2463     if (!tracing_session->config.builtin_data_sources().disable_system_info())
2464       EmitSystemInfo(&packets);
2465   }
2466   tracing_session->did_emit_initial_packets = true;
2467 
2468   // Note that in the proto comment, we guarantee that the tracing_started
2469   // lifecycle event will be emitted before any data packets so make sure to
2470   // keep this before reading the tracing buffers.
2471   if (!tracing_session->config.builtin_data_sources().disable_service_events())
2472     EmitLifecycleEvents(tracing_session, &packets);
2473 
2474   // In a multi-machine tracing session, emit clock synchronization messages for
2475   // remote machines.
2476   if (!relay_clients_.empty())
2477     MaybeEmitRemoteClockSync(tracing_session, &packets);
2478 
2479   size_t packets_bytes = 0;  // SUM(slice.size() for each slice in |packets|).
2480 
2481   // Add up size for packets added by the Maybe* calls above.
2482   for (const TracePacket& packet : packets) {
2483     packets_bytes += packet.size();
2484   }
2485 
2486   bool did_hit_threshold = false;
2487 
2488   for (size_t buf_idx = 0;
2489        buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
2490        buf_idx++) {
2491     auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
2492     if (tbuf_iter == buffers_.end()) {
2493       PERFETTO_DFATAL("Buffer not found.");
2494       continue;
2495     }
2496     TraceBuffer& tbuf = *tbuf_iter->second;
2497     tbuf.BeginRead();
2498     while (!did_hit_threshold) {
2499       TracePacket packet;
2500       TraceBuffer::PacketSequenceProperties sequence_properties{};
2501       bool previous_packet_dropped;
2502       if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
2503                                     &previous_packet_dropped)) {
2504         break;
2505       }
2506       packet.set_buffer_index_for_stats(static_cast<uint32_t>(buf_idx));
2507       PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
2508       PERFETTO_DCHECK(sequence_properties.writer_id != 0);
2509       PERFETTO_DCHECK(sequence_properties.client_identity_trusted.has_uid());
2510       // Not checking sequence_properties.client_identity_trusted.has_pid():
2511       // it is false if the platform doesn't support it.
2512 
2513       PERFETTO_DCHECK(packet.size() > 0);
2514       if (!PacketStreamValidator::Validate(packet.slices())) {
2515         tracing_session->invalid_packets++;
2516         PERFETTO_DLOG("Dropping invalid packet");
2517         continue;
2518       }
2519 
2520       // Append a slice with the trusted field data. This can't be spoofed
2521       // because above we validated that the existing slices don't contain any
2522       // trusted fields. For added safety we append instead of prepending
2523       // because according to protobuf semantics, if the same field is
2524       // encountered multiple times the last instance takes priority. Note that
2525       // truncated packets are also rejected, so the producer can't give us a
2526       // partial packet (e.g., a truncated string) which only becomes valid when
2527       // the trusted data is appended here.
2528       Slice slice = Slice::Allocate(32);
2529       protozero::StaticBuffered<protos::pbzero::TracePacket> trusted_packet(
2530           slice.own_data(), slice.size);
2531       const auto& client_identity_trusted =
2532           sequence_properties.client_identity_trusted;
2533       trusted_packet->set_trusted_uid(
2534           static_cast<int32_t>(client_identity_trusted.uid()));
2535       trusted_packet->set_trusted_packet_sequence_id(
2536           tracing_session->GetPacketSequenceID(
2537               client_identity_trusted.machine_id(),
2538               sequence_properties.producer_id_trusted,
2539               sequence_properties.writer_id));
2540       if (client_identity_trusted.has_pid()) {
2541         // Not supported on all platforms.
2542         trusted_packet->set_trusted_pid(
2543             static_cast<int32_t>(client_identity_trusted.pid()));
2544       }
2545       if (client_identity_trusted.has_non_default_machine_id()) {
2546         trusted_packet->set_machine_id(client_identity_trusted.machine_id());
2547       }
2548       if (previous_packet_dropped)
2549         trusted_packet->set_previous_packet_dropped(previous_packet_dropped);
2550       slice.size = trusted_packet.Finalize();
2551       packet.AddSlice(std::move(slice));
2552 
2553       // Append the packet (inclusive of the trusted uid) to |packets|.
2554       packets_bytes += packet.size();
2555       did_hit_threshold = packets_bytes >= threshold;
2556       packets.emplace_back(std::move(packet));
2557     }  // for(packets...)
2558   }    // for(buffers...)
2559 
2560   *has_more = did_hit_threshold;
2561 
2562   // Only emit the "read complete" lifetime event when there is no more trace
2563   // data available to read. These events are used as safe points to limit
2564   // sorting in trace processor: the code shouldn't emit the event unless the
2565   // buffers are empty.
2566   if (!*has_more && !tracing_session->config.builtin_data_sources()
2567                          .disable_service_events()) {
2568     // We don't bother snapshotting clocks here because we wouldn't be able to
2569     // emit it and we shouldn't have significant drift from the last snapshot in
2570     // any case.
2571     SnapshotLifecyleEvent(tracing_session,
2572                           protos::pbzero::TracingServiceEvent::
2573                               kReadTracingBuffersCompletedFieldNumber,
2574                           false /* snapshot_clocks */);
2575     EmitLifecycleEvents(tracing_session, &packets);
2576   }
2577 
2578   // Only emit the stats when there is no more trace data is available to read.
2579   // That way, any problems that occur while reading from the buffers are
2580   // reflected in the emitted stats. This is particularly important for use
2581   // cases where ReadBuffers is only ever called after the tracing session is
2582   // stopped.
2583   if (!*has_more && tracing_session->should_emit_stats) {
2584     EmitStats(tracing_session, &packets);
2585     tracing_session->should_emit_stats = false;
2586   }
2587 
2588   MaybeFilterPackets(tracing_session, &packets);
2589 
2590   MaybeCompressPackets(tracing_session, &packets);
2591 
2592   if (!*has_more) {
2593     // We've observed some extremely high memory usage by scudo after
2594     // MaybeFilterPackets in the past. The original bug (b/195145848) is fixed
2595     // now, but this code asks scudo to release memory just in case.
2596     base::MaybeReleaseAllocatorMemToOS();
2597   }
2598 
2599   return packets;
2600 }
2601 
MaybeFilterPackets(TracingSession * tracing_session,std::vector<TracePacket> * packets)2602 void TracingServiceImpl::MaybeFilterPackets(TracingSession* tracing_session,
2603                                             std::vector<TracePacket>* packets) {
2604   // If the tracing session specified a filter, run all packets through the
2605   // filter and replace them with the filter results.
2606   // The process below mantains the cardinality of input packets. Even if an
2607   // entire packet is filtered out, we emit a zero-sized TracePacket proto. That
2608   // makes debugging and reasoning about the trace stats easier.
2609   // This place swaps the contents of each |packets| entry in place.
2610   if (!tracing_session->trace_filter) {
2611     return;
2612   }
2613   protozero::MessageFilter& trace_filter = *tracing_session->trace_filter;
2614   // The filter root should be reset from protos.Trace to protos.TracePacket
2615   // by the earlier call to SetFilterRoot() in EnableTracing().
2616   PERFETTO_DCHECK(trace_filter.config().root_msg_index() != 0);
2617   std::vector<protozero::MessageFilter::InputSlice> filter_input;
2618   auto start = clock_->GetWallTimeNs();
2619   for (TracePacket& packet : *packets) {
2620     const auto& packet_slices = packet.slices();
2621     const size_t input_packet_size = packet.size();
2622     filter_input.clear();
2623     filter_input.resize(packet_slices.size());
2624     ++tracing_session->filter_input_packets;
2625     tracing_session->filter_input_bytes += input_packet_size;
2626     for (size_t i = 0; i < packet_slices.size(); ++i)
2627       filter_input[i] = {packet_slices[i].start, packet_slices[i].size};
2628     auto filtered_packet = trace_filter.FilterMessageFragments(
2629         &filter_input[0], filter_input.size());
2630 
2631     // Replace the packet in-place with the filtered one (unless failed).
2632     std::optional<uint32_t> maybe_buffer_idx = packet.buffer_index_for_stats();
2633     packet = TracePacket();
2634     if (filtered_packet.error) {
2635       ++tracing_session->filter_errors;
2636       PERFETTO_DLOG("Trace packet filtering failed @ packet %" PRIu64,
2637                     tracing_session->filter_input_packets);
2638       continue;
2639     }
2640     tracing_session->filter_output_bytes += filtered_packet.size;
2641     if (maybe_buffer_idx.has_value()) {
2642       // Keep the per-buffer stats updated. Also propagate the
2643       // buffer_index_for_stats in the output packet to allow accounting by
2644       // other parts of the ReadBuffer pipeline.
2645       uint32_t buffer_idx = maybe_buffer_idx.value();
2646       packet.set_buffer_index_for_stats(buffer_idx);
2647       auto& vec = tracing_session->filter_bytes_discarded_per_buffer;
2648       if (static_cast<size_t>(buffer_idx) >= vec.size())
2649         vec.resize(buffer_idx + 1);
2650       PERFETTO_DCHECK(input_packet_size >= filtered_packet.size);
2651       size_t bytes_filtered_out = input_packet_size - filtered_packet.size;
2652       vec[buffer_idx] += bytes_filtered_out;
2653     }
2654     AppendOwnedSlicesToPacket(std::move(filtered_packet.data),
2655                               filtered_packet.size, kMaxTracePacketSliceSize,
2656                               &packet);
2657   }
2658   auto end = clock_->GetWallTimeNs();
2659   tracing_session->filter_time_taken_ns +=
2660       static_cast<uint64_t>((end - start).count());
2661 }
2662 
MaybeCompressPackets(TracingSession * tracing_session,std::vector<TracePacket> * packets)2663 void TracingServiceImpl::MaybeCompressPackets(
2664     TracingSession* tracing_session,
2665     std::vector<TracePacket>* packets) {
2666   if (!tracing_session->compress_deflate) {
2667     return;
2668   }
2669 
2670   init_opts_.compressor_fn(packets);
2671 }
2672 
WriteIntoFile(TracingSession * tracing_session,std::vector<TracePacket> packets)2673 bool TracingServiceImpl::WriteIntoFile(TracingSession* tracing_session,
2674                                        std::vector<TracePacket> packets) {
2675   if (!tracing_session->write_into_file) {
2676     return false;
2677   }
2678   const uint64_t max_size = tracing_session->max_file_size_bytes
2679                                 ? tracing_session->max_file_size_bytes
2680                                 : std::numeric_limits<size_t>::max();
2681 
2682   size_t total_slices = 0;
2683   for (const TracePacket& packet : packets) {
2684     total_slices += packet.slices().size();
2685   }
2686   // When writing into a file, the file should look like a root trace.proto
2687   // message. Each packet should be prepended with a proto preamble stating
2688   // its field id (within trace.proto) and size. Hence the addition below.
2689   const size_t max_iovecs = total_slices + packets.size();
2690 
2691   size_t num_iovecs = 0;
2692   bool stop_writing_into_file = false;
2693   std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
2694   size_t num_iovecs_at_last_packet = 0;
2695   uint64_t bytes_about_to_be_written = 0;
2696   for (TracePacket& packet : packets) {
2697     std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
2698         packet.GetProtoPreamble();
2699     bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
2700     num_iovecs++;
2701     for (const Slice& slice : packet.slices()) {
2702       // writev() doesn't change the passed pointer. However, struct iovec
2703       // take a non-const ptr because it's the same struct used by readv().
2704       // Hence the const_cast here.
2705       char* start = static_cast<char*>(const_cast<void*>(slice.start));
2706       bytes_about_to_be_written += slice.size;
2707       iovecs[num_iovecs++] = {start, slice.size};
2708     }
2709 
2710     if (tracing_session->bytes_written_into_file + bytes_about_to_be_written >=
2711         max_size) {
2712       stop_writing_into_file = true;
2713       num_iovecs = num_iovecs_at_last_packet;
2714       break;
2715     }
2716 
2717     num_iovecs_at_last_packet = num_iovecs;
2718   }
2719   PERFETTO_DCHECK(num_iovecs <= max_iovecs);
2720   int fd = *tracing_session->write_into_file;
2721 
2722   uint64_t total_wr_size = 0;
2723 
2724   // writev() can take at most IOV_MAX entries per call. Batch them.
2725   constexpr size_t kIOVMax = IOV_MAX;
2726   for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
2727     int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
2728     ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
2729     if (wr_size <= 0) {
2730       PERFETTO_PLOG("writev() failed");
2731       stop_writing_into_file = true;
2732       break;
2733     }
2734     total_wr_size += static_cast<size_t>(wr_size);
2735   }
2736 
2737   tracing_session->bytes_written_into_file += total_wr_size;
2738 
2739   PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
2740                 (total_wr_size + 1023) / 1024, stop_writing_into_file);
2741   return stop_writing_into_file;
2742 }
2743 
FreeBuffers(TracingSessionID tsid)2744 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
2745   PERFETTO_DCHECK_THREAD(thread_checker_);
2746   PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
2747   TracingSession* tracing_session = GetTracingSession(tsid);
2748   if (!tracing_session) {
2749     PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
2750     return;  // TODO(primiano): signal failure?
2751   }
2752   DisableTracing(tsid, /*disable_immediately=*/true);
2753 
2754   PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
2755   tracing_session->data_source_instances.clear();
2756 
2757   for (auto& producer_entry : producers_) {
2758     ProducerEndpointImpl* producer = producer_entry.second;
2759     producer->OnFreeBuffers(tracing_session->buffers_index);
2760   }
2761 
2762   for (BufferID buffer_id : tracing_session->buffers_index) {
2763     buffer_ids_.Free(buffer_id);
2764     PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
2765     buffers_.erase(buffer_id);
2766   }
2767   bool notify_traceur =
2768       tracing_session->config.notify_traceur() &&
2769       tracing_session->state != TracingSession::CLONED_READ_ONLY;
2770   bool is_long_trace =
2771       (tracing_session->config.write_into_file() &&
2772        tracing_session->config.file_write_period_ms() < kMillisPerDay);
2773   auto pending_clones = std::move(tracing_session->pending_clones);
2774   tracing_sessions_.erase(tsid);
2775   tracing_session = nullptr;
2776   UpdateMemoryGuardrail();
2777 
2778   for (const auto& id_to_clone_op : pending_clones) {
2779     const PendingClone& clone_op = id_to_clone_op.second;
2780     if (clone_op.weak_consumer) {
2781       weak_runner_.task_runner()->PostTask(
2782           [weak_consumer = clone_op.weak_consumer] {
2783             if (weak_consumer) {
2784               weak_consumer->consumer_->OnSessionCloned(
2785                   {false, "Original session ended", {}});
2786             }
2787           });
2788     }
2789   }
2790 
2791   PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
2792                tracing_sessions_.size());
2793 #if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD) && \
2794     PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2795   if (notify_traceur && is_long_trace) {
2796     PERFETTO_LAZY_LOAD(android_internal::NotifyTraceSessionEnded, notify_fn);
2797     if (!notify_fn || !notify_fn(/*session_stolen=*/false))
2798       PERFETTO_ELOG("Failed to notify Traceur long tracing has ended");
2799   }
2800 #else
2801   base::ignore_result(notify_traceur);
2802   base::ignore_result(is_long_trace);
2803 #endif
2804 }
2805 
RegisterDataSource(ProducerID producer_id,const DataSourceDescriptor & desc)2806 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
2807                                             const DataSourceDescriptor& desc) {
2808   PERFETTO_DCHECK_THREAD(thread_checker_);
2809   if (desc.name().empty()) {
2810     PERFETTO_DLOG("Received RegisterDataSource() with empty name");
2811     return;
2812   }
2813 
2814   ProducerEndpointImpl* producer = GetProducer(producer_id);
2815   if (!producer) {
2816     PERFETTO_DFATAL("Producer not found.");
2817     return;
2818   }
2819 
2820   // Check that the producer doesn't register two data sources with the same ID.
2821   // Note that we tolerate |id| == 0 because until Android T / v22 the |id|
2822   // field didn't exist.
2823   for (const auto& kv : data_sources_) {
2824     if (desc.id() && kv.second.producer_id == producer_id &&
2825         kv.second.descriptor.id() == desc.id()) {
2826       PERFETTO_ELOG(
2827           "Failed to register data source \"%s\". A data source with the same "
2828           "id %" PRIu64 " (name=\"%s\") is already registered for producer %d",
2829           desc.name().c_str(), desc.id(), kv.second.descriptor.name().c_str(),
2830           producer_id);
2831       return;
2832     }
2833   }
2834 
2835   PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
2836                 producer_id, desc.name().c_str());
2837 
2838   auto reg_ds = data_sources_.emplace(desc.name(),
2839                                       RegisteredDataSource{producer_id, desc});
2840 
2841   // If there are existing tracing sessions, we need to check if the new
2842   // data source is enabled by any of them.
2843   for (auto& iter : tracing_sessions_) {
2844     TracingSession& tracing_session = iter.second;
2845     if (tracing_session.state != TracingSession::STARTED &&
2846         tracing_session.state != TracingSession::CONFIGURED) {
2847       continue;
2848     }
2849 
2850     TraceConfig::ProducerConfig producer_config;
2851     for (const auto& config : tracing_session.config.producers()) {
2852       if (producer->name_ == config.producer_name()) {
2853         producer_config = config;
2854         break;
2855       }
2856     }
2857     for (const TraceConfig::DataSource& cfg_data_source :
2858          tracing_session.config.data_sources()) {
2859       if (cfg_data_source.config().name() != desc.name())
2860         continue;
2861       DataSourceInstance* ds_inst = SetupDataSource(
2862           cfg_data_source, producer_config, reg_ds->second, &tracing_session);
2863       if (ds_inst && tracing_session.state == TracingSession::STARTED)
2864         StartDataSourceInstance(producer, &tracing_session, ds_inst);
2865     }
2866   }  // for(iter : tracing_sessions_)
2867 }
2868 
UpdateDataSource(ProducerID producer_id,const DataSourceDescriptor & new_desc)2869 void TracingServiceImpl::UpdateDataSource(
2870     ProducerID producer_id,
2871     const DataSourceDescriptor& new_desc) {
2872   if (new_desc.id() == 0) {
2873     PERFETTO_ELOG("UpdateDataSource() must have a non-zero id");
2874     return;
2875   }
2876 
2877   // If this producer has already registered a matching descriptor name and id,
2878   // just update the descriptor.
2879   RegisteredDataSource* data_source = nullptr;
2880   auto range = data_sources_.equal_range(new_desc.name());
2881   for (auto it = range.first; it != range.second; ++it) {
2882     if (it->second.producer_id == producer_id &&
2883         it->second.descriptor.id() == new_desc.id()) {
2884       data_source = &it->second;
2885       break;
2886     }
2887   }
2888 
2889   if (!data_source) {
2890     PERFETTO_ELOG(
2891         "UpdateDataSource() failed, could not find an existing data source "
2892         "with name=\"%s\" id=%" PRIu64,
2893         new_desc.name().c_str(), new_desc.id());
2894     return;
2895   }
2896 
2897   data_source->descriptor = new_desc;
2898 }
2899 
StopDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,DataSourceInstance * instance,bool disable_immediately)2900 void TracingServiceImpl::StopDataSourceInstance(ProducerEndpointImpl* producer,
2901                                                 TracingSession* tracing_session,
2902                                                 DataSourceInstance* instance,
2903                                                 bool disable_immediately) {
2904   const DataSourceInstanceID ds_inst_id = instance->instance_id;
2905   if (instance->will_notify_on_stop && !disable_immediately) {
2906     instance->state = DataSourceInstance::STOPPING;
2907   } else {
2908     instance->state = DataSourceInstance::STOPPED;
2909   }
2910   if (tracing_session->consumer_maybe_null) {
2911     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2912         *producer, *instance);
2913   }
2914   producer->StopDataSource(ds_inst_id);
2915 }
2916 
UnregisterDataSource(ProducerID producer_id,const std::string & name)2917 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
2918                                               const std::string& name) {
2919   PERFETTO_DCHECK_THREAD(thread_checker_);
2920   PERFETTO_DLOG("Producer %" PRIu16 " unregistered data source \"%s\"",
2921                 producer_id, name.c_str());
2922   PERFETTO_CHECK(producer_id);
2923   ProducerEndpointImpl* producer = GetProducer(producer_id);
2924   PERFETTO_DCHECK(producer);
2925   for (auto& kv : tracing_sessions_) {
2926     auto& ds_instances = kv.second.data_source_instances;
2927     bool removed = false;
2928     for (auto it = ds_instances.begin(); it != ds_instances.end();) {
2929       if (it->first == producer_id && it->second.data_source_name == name) {
2930         DataSourceInstanceID ds_inst_id = it->second.instance_id;
2931         if (it->second.state != DataSourceInstance::STOPPED) {
2932           if (it->second.state != DataSourceInstance::STOPPING) {
2933             StopDataSourceInstance(producer, &kv.second, &it->second,
2934                                    /* disable_immediately = */ false);
2935           }
2936 
2937           // Mark the instance as stopped immediately, since we are
2938           // unregistering it below.
2939           //
2940           //  The StopDataSourceInstance above might have set the state to
2941           //  STOPPING so this condition isn't an else.
2942           if (it->second.state == DataSourceInstance::STOPPING)
2943             NotifyDataSourceStopped(producer_id, ds_inst_id);
2944         }
2945         it = ds_instances.erase(it);
2946         removed = true;
2947       } else {
2948         ++it;
2949       }
2950     }  // for (data_source_instances)
2951     if (removed)
2952       MaybeNotifyAllDataSourcesStarted(&kv.second);
2953   }  // for (tracing_session)
2954 
2955   for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
2956     if (it->second.producer_id == producer_id &&
2957         it->second.descriptor.name() == name) {
2958       data_sources_.erase(it);
2959       return;
2960     }
2961   }
2962 
2963   PERFETTO_DFATAL(
2964       "Tried to unregister a non-existent data source \"%s\" for "
2965       "producer %" PRIu16,
2966       name.c_str(), producer_id);
2967 }
2968 
IsInitiatorPrivileged(const TracingSession & tracing_session)2969 bool TracingServiceImpl::IsInitiatorPrivileged(
2970     const TracingSession& tracing_session) {
2971 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2972   if (tracing_session.consumer_uid == 1066 /* AID_STATSD */ &&
2973       tracing_session.config.statsd_metadata().triggering_config_uid() !=
2974           2000 /* AID_SHELL */
2975       && tracing_session.config.statsd_metadata().triggering_config_uid() !=
2976              0 /* AID_ROOT */) {
2977     // StatsD can be triggered either by shell, root or an app that has DUMP and
2978     // USAGE_STATS permission. When triggered by shell or root, we do not want
2979     // to consider the trace a trusted system trace, as it was initiated by the
2980     // user. Otherwise, it has to come from an app with DUMP and
2981     // PACKAGE_USAGE_STATS, which has to be preinstalled and trusted by the
2982     // system.
2983     // Check for shell / root: https://bit.ly/3b7oZNi
2984     // Check for DUMP or PACKAGE_USAGE_STATS: https://bit.ly/3ep0NrR
2985     return true;
2986   }
2987   if (tracing_session.consumer_uid == 1000 /* AID_SYSTEM */) {
2988     // AID_SYSTEM is considered a privileged initiator so that system_server can
2989     // profile apps that are not profileable by shell. Other AID_SYSTEM
2990     // processes are not allowed by SELinux to connect to the consumer socket or
2991     // to exec perfetto.
2992     return true;
2993   }
2994 #else
2995   base::ignore_result(tracing_session);
2996 #endif
2997   return false;
2998 }
2999 
SetupDataSource(const TraceConfig::DataSource & cfg_data_source,const TraceConfig::ProducerConfig & producer_config,const RegisteredDataSource & data_source,TracingSession * tracing_session)3000 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
3001     const TraceConfig::DataSource& cfg_data_source,
3002     const TraceConfig::ProducerConfig& producer_config,
3003     const RegisteredDataSource& data_source,
3004     TracingSession* tracing_session) {
3005   PERFETTO_DCHECK_THREAD(thread_checker_);
3006   ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
3007   PERFETTO_DCHECK(producer);
3008   // An existing producer that is not ftrace could have registered itself as
3009   // ftrace, we must not enable it in that case.
3010   if (lockdown_mode_ && producer->uid() != uid_) {
3011     PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
3012     return nullptr;
3013   }
3014   // TODO(primiano): Add tests for registration ordering (data sources vs
3015   // consumers).
3016   if (!NameMatchesFilter(producer->name_,
3017                          cfg_data_source.producer_name_filter(),
3018                          cfg_data_source.producer_name_regex_filter())) {
3019     PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
3020                   cfg_data_source.config().name().c_str(),
3021                   producer->name_.c_str());
3022     return nullptr;
3023   }
3024 
3025   auto relative_buffer_id = cfg_data_source.config().target_buffer();
3026   if (relative_buffer_id >= tracing_session->num_buffers()) {
3027     PERFETTO_LOG(
3028         "The TraceConfig for DataSource %s specified a target_buffer out of "
3029         "bound (%d). Skipping it.",
3030         cfg_data_source.config().name().c_str(), relative_buffer_id);
3031     return nullptr;
3032   }
3033 
3034   // Create a copy of the DataSourceConfig specified in the trace config. This
3035   // will be passed to the producer after translating the |target_buffer| id.
3036   // The |target_buffer| parameter passed by the consumer in the trace config is
3037   // relative to the buffers declared in the same trace config. This has to be
3038   // translated to the global BufferID before passing it to the producers, which
3039   // don't know anything about tracing sessions and consumers.
3040 
3041   DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
3042   auto insert_iter = tracing_session->data_source_instances.emplace(
3043       std::piecewise_construct,  //
3044       std::forward_as_tuple(producer->id_),
3045       std::forward_as_tuple(
3046           inst_id,
3047           cfg_data_source.config(),  //  Deliberate copy.
3048           data_source.descriptor.name(),
3049           data_source.descriptor.will_notify_on_start(),
3050           data_source.descriptor.will_notify_on_stop(),
3051           data_source.descriptor.handles_incremental_state_clear(),
3052           data_source.descriptor.no_flush()));
3053   DataSourceInstance* ds_instance = &insert_iter->second;
3054 
3055   // New data source instance starts out in CONFIGURED state.
3056   if (tracing_session->consumer_maybe_null) {
3057     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
3058         *producer, *ds_instance);
3059   }
3060 
3061   DataSourceConfig& ds_config = ds_instance->config;
3062   ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
3063 
3064   // Rationale for `if (prefer) set_prefer(true)`, rather than `set(prefer)`:
3065   // ComputeStartupConfigHash() in tracing_muxer_impl.cc compares hashes of the
3066   // DataSourceConfig and expects to know (and clear) the fields generated by
3067   // the tracing service. Unconditionally adding a new field breaks backward
3068   // compatibility of startup tracing with older SDKs, because the serialization
3069   // also propagates unkonwn fields, breaking the hash matching check.
3070   if (tracing_session->config.prefer_suspend_clock_for_duration())
3071     ds_config.set_prefer_suspend_clock_for_duration(true);
3072 
3073   ds_config.set_stop_timeout_ms(tracing_session->data_source_stop_timeout_ms());
3074   ds_config.set_enable_extra_guardrails(
3075       tracing_session->config.enable_extra_guardrails());
3076   if (IsInitiatorPrivileged(*tracing_session)) {
3077     ds_config.set_session_initiator(
3078         DataSourceConfig::SESSION_INITIATOR_TRUSTED_SYSTEM);
3079   } else {
3080     // Unset in case the consumer set it.
3081     // We need to be able to trust this field.
3082     ds_config.set_session_initiator(
3083         DataSourceConfig::SESSION_INITIATOR_UNSPECIFIED);
3084   }
3085   ds_config.set_tracing_session_id(tracing_session->id);
3086   BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
3087   PERFETTO_DCHECK(global_id);
3088   ds_config.set_target_buffer(global_id);
3089 
3090   PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
3091                 ds_config.name().c_str(), global_id);
3092   if (!producer->shared_memory()) {
3093     // Determine the SMB page size. Must be an integer multiple of 4k.
3094     // As for the SMB size below, the decision tree is as follows:
3095     // 1. Give priority to what is defined in the trace config.
3096     // 2. If unset give priority to the hint passed by the producer.
3097     // 3. Keep within bounds and ensure it's a multiple of 4k.
3098     size_t page_size = producer_config.page_size_kb() * 1024;
3099     if (page_size == 0)
3100       page_size = producer->shmem_page_size_hint_bytes_;
3101 
3102     // Determine the SMB size. Must be an integer multiple of the SMB page size.
3103     // The decision tree is as follows:
3104     // 1. Give priority to what defined in the trace config.
3105     // 2. If unset give priority to the hint passed by the producer.
3106     // 3. Keep within bounds and ensure it's a multiple of the page size.
3107     size_t shm_size = producer_config.shm_size_kb() * 1024;
3108     if (shm_size == 0)
3109       shm_size = producer->shmem_size_hint_bytes_;
3110 
3111     auto valid_sizes = EnsureValidShmSizes(shm_size, page_size);
3112     if (valid_sizes != std::tie(shm_size, page_size)) {
3113       PERFETTO_DLOG(
3114           "Invalid configured SMB sizes: shm_size %zu page_size %zu. Falling "
3115           "back to shm_size %zu page_size %zu.",
3116           shm_size, page_size, std::get<0>(valid_sizes),
3117           std::get<1>(valid_sizes));
3118     }
3119     std::tie(shm_size, page_size) = valid_sizes;
3120 
3121     // TODO(primiano): right now Create() will suicide in case of OOM if the
3122     // mmap fails. We should instead gracefully fail the request and tell the
3123     // client to go away.
3124     PERFETTO_DLOG("Creating SMB of %zu KB for producer \"%s\"", shm_size / 1024,
3125                   producer->name_.c_str());
3126     auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
3127     producer->SetupSharedMemory(std::move(shared_memory), page_size,
3128                                 /*provided_by_producer=*/false);
3129   }
3130   producer->SetupDataSource(inst_id, ds_config);
3131   return ds_instance;
3132 }
3133 
3134 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
3135 // might be lying / returning garbage contents. |src| and |size| can be trusted
3136 // in terms of being a valid pointer, but not the contents.
CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,const ClientIdentity & client_identity_trusted,WriterID writer_id,ChunkID chunk_id,BufferID buffer_id,uint16_t num_fragments,uint8_t chunk_flags,bool chunk_complete,const uint8_t * src,size_t size)3137 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
3138     ProducerID producer_id_trusted,
3139     const ClientIdentity& client_identity_trusted,
3140     WriterID writer_id,
3141     ChunkID chunk_id,
3142     BufferID buffer_id,
3143     uint16_t num_fragments,
3144     uint8_t chunk_flags,
3145     bool chunk_complete,
3146     const uint8_t* src,
3147     size_t size) {
3148   PERFETTO_DCHECK_THREAD(thread_checker_);
3149 
3150   ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
3151   if (!producer) {
3152     PERFETTO_DFATAL("Producer not found.");
3153     chunks_discarded_++;
3154     return;
3155   }
3156 
3157   TraceBuffer* buf = GetBufferByID(buffer_id);
3158   if (!buf) {
3159     PERFETTO_DLOG("Could not find target buffer %" PRIu16
3160                   " for producer %" PRIu16,
3161                   buffer_id, producer_id_trusted);
3162     chunks_discarded_++;
3163     return;
3164   }
3165 
3166   // Verify that the producer is actually allowed to write into the target
3167   // buffer specified in the request. This prevents a malicious producer from
3168   // injecting data into a log buffer that belongs to a tracing session the
3169   // producer is not part of.
3170   if (!producer->is_allowed_target_buffer(buffer_id)) {
3171     PERFETTO_ELOG("Producer %" PRIu16
3172                   " tried to write into forbidden target buffer %" PRIu16,
3173                   producer_id_trusted, buffer_id);
3174     PERFETTO_DFATAL("Forbidden target buffer");
3175     chunks_discarded_++;
3176     return;
3177   }
3178 
3179   // If the writer was registered by the producer, it should only write into the
3180   // buffer it was registered with.
3181   std::optional<BufferID> associated_buffer =
3182       producer->buffer_id_for_writer(writer_id);
3183   if (associated_buffer && *associated_buffer != buffer_id) {
3184     PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
3185                   " was registered to write into target buffer %" PRIu16
3186                   ", but tried to write into buffer %" PRIu16,
3187                   writer_id, producer_id_trusted, *associated_buffer,
3188                   buffer_id);
3189     PERFETTO_DFATAL("Wrong target buffer");
3190     chunks_discarded_++;
3191     return;
3192   }
3193 
3194   buf->CopyChunkUntrusted(producer_id_trusted, client_identity_trusted,
3195                           writer_id, chunk_id, num_fragments, chunk_flags,
3196                           chunk_complete, src, size);
3197 }
3198 
ApplyChunkPatches(ProducerID producer_id_trusted,const std::vector<CommitDataRequest::ChunkToPatch> & chunks_to_patch)3199 void TracingServiceImpl::ApplyChunkPatches(
3200     ProducerID producer_id_trusted,
3201     const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
3202   PERFETTO_DCHECK_THREAD(thread_checker_);
3203 
3204   for (const auto& chunk : chunks_to_patch) {
3205     const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
3206     const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
3207     TraceBuffer* buf =
3208         GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
3209     static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
3210                   "Add a '|| chunk_id > kMaxChunkID' below if this fails");
3211     if (!writer_id || writer_id > kMaxWriterID || !buf) {
3212       // This can genuinely happen when the trace is stopped. The producers
3213       // might see the stop signal with some delay and try to keep sending
3214       // patches left soon after.
3215       PERFETTO_DLOG(
3216           "Received invalid chunks_to_patch request from Producer: %" PRIu16
3217           ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
3218           producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
3219       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
3220       continue;
3221     }
3222 
3223     // Note, there's no need to validate that the producer is allowed to write
3224     // to the specified buffer ID (or that it's the correct buffer ID for a
3225     // registered TraceWriter). That's because TraceBuffer uses the producer ID
3226     // and writer ID to look up the chunk to patch. If the producer specifies an
3227     // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
3228     // patches. Because the producer ID is trusted, there's also no way for a
3229     // malicious producer to patch another producer's data.
3230 
3231     // Speculate on the fact that there are going to be a limited amount of
3232     // patches per request, so we can allocate the |patches| array on the stack.
3233     std::array<TraceBuffer::Patch, 1024> patches;  // Uninitialized.
3234     if (chunk.patches().size() > patches.size()) {
3235       PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
3236                     patches.size());
3237       PERFETTO_DFATAL("Too many patches");
3238       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
3239       continue;
3240     }
3241 
3242     size_t i = 0;
3243     for (const auto& patch : chunk.patches()) {
3244       const std::string& patch_data = patch.data();
3245       if (patch_data.size() != patches[i].data.size()) {
3246         PERFETTO_ELOG("Received patch from producer: %" PRIu16
3247                       " of unexpected size %zu",
3248                       producer_id_trusted, patch_data.size());
3249         patches_discarded_++;
3250         continue;
3251       }
3252       patches[i].offset_untrusted = patch.offset();
3253       memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
3254       i++;
3255     }
3256     buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
3257                                &patches[0], i, chunk.has_more_patches());
3258   }
3259 }
3260 
GetDetachedSession(uid_t uid,const std::string & key)3261 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
3262     uid_t uid,
3263     const std::string& key) {
3264   PERFETTO_DCHECK_THREAD(thread_checker_);
3265   for (auto& kv : tracing_sessions_) {
3266     TracingSession* session = &kv.second;
3267     if (session->consumer_uid == uid && session->detach_key == key) {
3268       PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
3269       return session;
3270     }
3271   }
3272   return nullptr;
3273 }
3274 
GetTracingSession(TracingSessionID tsid)3275 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
3276     TracingSessionID tsid) {
3277   PERFETTO_DCHECK_THREAD(thread_checker_);
3278   auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
3279   if (it == tracing_sessions_.end())
3280     return nullptr;
3281   return &it->second;
3282 }
3283 
3284 TracingServiceImpl::TracingSession*
GetTracingSessionByUniqueName(const std::string & unique_session_name)3285 TracingServiceImpl::GetTracingSessionByUniqueName(
3286     const std::string& unique_session_name) {
3287   PERFETTO_DCHECK_THREAD(thread_checker_);
3288   if (unique_session_name.empty()) {
3289     return nullptr;
3290   }
3291   for (auto& session_id_and_session : tracing_sessions_) {
3292     TracingSession& session = session_id_and_session.second;
3293     if (session.state == TracingSession::CLONED_READ_ONLY) {
3294       continue;
3295     }
3296     if (session.config.unique_session_name() == unique_session_name) {
3297       return &session;
3298     }
3299   }
3300   return nullptr;
3301 }
3302 
3303 TracingServiceImpl::TracingSession*
FindTracingSessionWithMaxBugreportScore()3304 TracingServiceImpl::FindTracingSessionWithMaxBugreportScore() {
3305   TracingSession* max_session = nullptr;
3306   for (auto& session_id_and_session : tracing_sessions_) {
3307     auto& session = session_id_and_session.second;
3308     const int32_t score = session.config.bugreport_score();
3309     // Exclude sessions with 0 (or below) score. By default tracing sessions
3310     // should NOT be eligible to be attached to bugreports.
3311     if (score <= 0 || session.state != TracingSession::STARTED)
3312       continue;
3313 
3314     if (!max_session || score > max_session->config.bugreport_score())
3315       max_session = &session;
3316   }
3317   return max_session;
3318 }
3319 
GetNextProducerID()3320 ProducerID TracingServiceImpl::GetNextProducerID() {
3321   PERFETTO_DCHECK_THREAD(thread_checker_);
3322   PERFETTO_CHECK(producers_.size() < kMaxProducerID);
3323   do {
3324     ++last_producer_id_;
3325   } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
3326   PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
3327   return last_producer_id_;
3328 }
3329 
GetBufferByID(BufferID buffer_id)3330 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
3331   auto buf_iter = buffers_.find(buffer_id);
3332   if (buf_iter == buffers_.end())
3333     return nullptr;
3334   return &*buf_iter->second;
3335 }
3336 
OnStartTriggersTimeout(TracingSessionID tsid)3337 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
3338   // Skip entirely the flush if the trace session doesn't exist anymore.
3339   // This is to prevent misleading error messages to be logged.
3340   //
3341   // if the trace has started from the trigger we rely on
3342   // the |stop_delay_ms| from the trigger so don't flush and
3343   // disable if we've moved beyond a CONFIGURED state
3344   auto* tracing_session_ptr = GetTracingSession(tsid);
3345   if (tracing_session_ptr &&
3346       tracing_session_ptr->state == TracingSession::CONFIGURED) {
3347     PERFETTO_DLOG("Disabling TracingSession %" PRIu64
3348                   " since no triggers activated.",
3349                   tsid);
3350     // No data should be returned from ReadBuffers() regardless of if we
3351     // call FreeBuffers() or DisableTracing(). This is because in
3352     // STOP_TRACING we need this promise in either case, and using
3353     // DisableTracing() allows a graceful shutdown. Consumers can follow
3354     // their normal path and check the buffers through ReadBuffers() and
3355     // the code won't hang because the tracing session will still be
3356     // alive just disabled.
3357     DisableTracing(tsid);
3358   }
3359 }
3360 
UpdateMemoryGuardrail()3361 void TracingServiceImpl::UpdateMemoryGuardrail() {
3362 #if PERFETTO_BUILDFLAG(PERFETTO_WATCHDOG)
3363   uint64_t total_buffer_bytes = 0;
3364 
3365   // Sum up all the shared memory buffers.
3366   for (const auto& id_to_producer : producers_) {
3367     if (id_to_producer.second->shared_memory())
3368       total_buffer_bytes += id_to_producer.second->shared_memory()->size();
3369   }
3370 
3371   // Sum up all the trace buffers.
3372   for (const auto& id_to_buffer : buffers_) {
3373     total_buffer_bytes += id_to_buffer.second->size();
3374   }
3375 
3376   // Sum up all the cloned traced buffers.
3377   for (const auto& id_to_ts : tracing_sessions_) {
3378     const TracingSession& ts = id_to_ts.second;
3379     for (const auto& id_to_clone_op : ts.pending_clones) {
3380       const PendingClone& clone_op = id_to_clone_op.second;
3381       for (const std::unique_ptr<TraceBuffer>& buf : clone_op.buffers) {
3382         if (buf) {
3383           total_buffer_bytes += buf->size();
3384         }
3385       }
3386     }
3387   }
3388 
3389   // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
3390   // interval.
3391   uint64_t guardrail = base::kWatchdogDefaultMemorySlack + total_buffer_bytes;
3392   base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
3393 #endif
3394 }
3395 
PeriodicSnapshotTask(TracingSessionID tsid)3396 void TracingServiceImpl::PeriodicSnapshotTask(TracingSessionID tsid) {
3397   auto* tracing_session = GetTracingSession(tsid);
3398   if (!tracing_session)
3399     return;
3400   if (tracing_session->state != TracingSession::STARTED)
3401     return;
3402   tracing_session->should_emit_sync_marker = true;
3403   tracing_session->should_emit_stats = true;
3404   MaybeSnapshotClocksIntoRingBuffer(tracing_session);
3405 }
3406 
SnapshotLifecyleEvent(TracingSession * tracing_session,uint32_t field_id,bool snapshot_clocks)3407 void TracingServiceImpl::SnapshotLifecyleEvent(TracingSession* tracing_session,
3408                                                uint32_t field_id,
3409                                                bool snapshot_clocks) {
3410   // field_id should be an id of a field in TracingServiceEvent.
3411   auto& lifecycle_events = tracing_session->lifecycle_events;
3412   auto event_it =
3413       std::find_if(lifecycle_events.begin(), lifecycle_events.end(),
3414                    [field_id](const TracingSession::LifecycleEvent& event) {
3415                      return event.field_id == field_id;
3416                    });
3417 
3418   TracingSession::LifecycleEvent* event;
3419   if (event_it == lifecycle_events.end()) {
3420     lifecycle_events.emplace_back(field_id);
3421     event = &lifecycle_events.back();
3422   } else {
3423     event = &*event_it;
3424   }
3425 
3426   // Snapshot the clocks before capturing the timestamp for the event so we can
3427   // use this snapshot to resolve the event timestamp if necessary.
3428   if (snapshot_clocks)
3429     MaybeSnapshotClocksIntoRingBuffer(tracing_session);
3430 
3431   // Erase before emplacing to prevent a unncessary doubling of memory if
3432   // not needed.
3433   if (event->timestamps.size() >= event->max_size) {
3434     event->timestamps.erase_front(1 + event->timestamps.size() -
3435                                   event->max_size);
3436   }
3437   event->timestamps.emplace_back(clock_->GetBootTimeNs().count());
3438 }
3439 
MaybeSnapshotClocksIntoRingBuffer(TracingSession * tracing_session)3440 void TracingServiceImpl::MaybeSnapshotClocksIntoRingBuffer(
3441     TracingSession* tracing_session) {
3442   if (tracing_session->config.builtin_data_sources()
3443           .disable_clock_snapshotting()) {
3444     return;
3445   }
3446 
3447   // We are making an explicit copy of the latest snapshot (if it exists)
3448   // because SnapshotClocks reads this data and computes the drift based on its
3449   // content. If the clock drift is high enough, it will update the contents of
3450   // |snapshot| and return true. Otherwise, it will return false.
3451   TracingSession::ClockSnapshotData snapshot =
3452       tracing_session->clock_snapshot_ring_buffer.empty()
3453           ? TracingSession::ClockSnapshotData()
3454           : tracing_session->clock_snapshot_ring_buffer.back();
3455   bool did_update = SnapshotClocks(&snapshot);
3456   if (did_update) {
3457     // This means clocks drifted enough since last snapshot. See the comment
3458     // in SnapshotClocks.
3459     auto* snapshot_buffer = &tracing_session->clock_snapshot_ring_buffer;
3460 
3461     // Erase before emplacing to prevent a unncessary doubling of memory if
3462     // not needed.
3463     static constexpr uint32_t kClockSnapshotRingBufferSize = 16;
3464     if (snapshot_buffer->size() >= kClockSnapshotRingBufferSize) {
3465       snapshot_buffer->erase_front(1 + snapshot_buffer->size() -
3466                                    kClockSnapshotRingBufferSize);
3467     }
3468     snapshot_buffer->emplace_back(std::move(snapshot));
3469   }
3470 }
3471 
3472 // Returns true when the data in |snapshot_data| is updated with the new state
3473 // of the clocks and false otherwise.
SnapshotClocks(TracingSession::ClockSnapshotData * snapshot_data)3474 bool TracingServiceImpl::SnapshotClocks(
3475     TracingSession::ClockSnapshotData* snapshot_data) {
3476   // Minimum drift that justifies replacing a prior clock snapshot that hasn't
3477   // been emitted into the trace yet (see comment below).
3478   static constexpr int64_t kSignificantDriftNs = 10 * 1000 * 1000;  // 10 ms
3479 
3480   TracingSession::ClockSnapshotData new_snapshot_data =
3481       base::CaptureClockSnapshots();
3482   // If we're about to update a session's latest clock snapshot that hasn't been
3483   // emitted into the trace yet, check whether the clocks have drifted enough to
3484   // warrant overriding the current snapshot values. The older snapshot would be
3485   // valid for a larger part of the currently buffered trace data because the
3486   // clock sync protocol in trace processor uses the latest clock <= timestamp
3487   // to translate times (see https://perfetto.dev/docs/concepts/clock-sync), so
3488   // we try to keep it if we can.
3489   if (!snapshot_data->empty()) {
3490     PERFETTO_DCHECK(snapshot_data->size() == new_snapshot_data.size());
3491     PERFETTO_DCHECK((*snapshot_data)[0].clock_id ==
3492                     protos::gen::BUILTIN_CLOCK_BOOTTIME);
3493 
3494     bool update_snapshot = false;
3495     uint64_t old_boot_ns = (*snapshot_data)[0].timestamp;
3496     uint64_t new_boot_ns = new_snapshot_data[0].timestamp;
3497     int64_t boot_diff =
3498         static_cast<int64_t>(new_boot_ns) - static_cast<int64_t>(old_boot_ns);
3499 
3500     for (size_t i = 1; i < snapshot_data->size(); i++) {
3501       uint64_t old_ns = (*snapshot_data)[i].timestamp;
3502       uint64_t new_ns = new_snapshot_data[i].timestamp;
3503 
3504       int64_t diff =
3505           static_cast<int64_t>(new_ns) - static_cast<int64_t>(old_ns);
3506 
3507       // Compare the boottime delta against the delta of this clock.
3508       if (std::abs(boot_diff - diff) >= kSignificantDriftNs) {
3509         update_snapshot = true;
3510         break;
3511       }
3512     }
3513     if (!update_snapshot)
3514       return false;
3515     snapshot_data->clear();
3516   }
3517 
3518   *snapshot_data = std::move(new_snapshot_data);
3519   return true;
3520 }
3521 
EmitClockSnapshot(TracingSession * tracing_session,TracingSession::ClockSnapshotData snapshot_data,std::vector<TracePacket> * packets)3522 void TracingServiceImpl::EmitClockSnapshot(
3523     TracingSession* tracing_session,
3524     TracingSession::ClockSnapshotData snapshot_data,
3525     std::vector<TracePacket>* packets) {
3526   PERFETTO_DCHECK(!tracing_session->config.builtin_data_sources()
3527                        .disable_clock_snapshotting());
3528 
3529   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3530   auto* snapshot = packet->set_clock_snapshot();
3531 
3532   protos::gen::BuiltinClock trace_clock =
3533       tracing_session->config.builtin_data_sources().primary_trace_clock();
3534   if (!trace_clock)
3535     trace_clock = protos::gen::BUILTIN_CLOCK_BOOTTIME;
3536   snapshot->set_primary_trace_clock(
3537       static_cast<protos::pbzero::BuiltinClock>(trace_clock));
3538 
3539   for (auto& clock_id_and_ts : snapshot_data) {
3540     auto* c = snapshot->add_clocks();
3541     c->set_clock_id(clock_id_and_ts.clock_id);
3542     c->set_timestamp(clock_id_and_ts.timestamp);
3543   }
3544 
3545   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3546   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3547   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3548 }
3549 
EmitSyncMarker(std::vector<TracePacket> * packets)3550 void TracingServiceImpl::EmitSyncMarker(std::vector<TracePacket>* packets) {
3551   // The sync marks are used to tokenize large traces efficiently.
3552   // See description in trace_packet.proto.
3553   if (sync_marker_packet_size_ == 0) {
3554     // The marker ABI expects that the marker is written after the uid.
3555     // Protozero guarantees that fields are written in the same order of the
3556     // calls. The ResynchronizeTraceStreamUsingSyncMarker test verifies the ABI.
3557     protozero::StaticBuffered<protos::pbzero::TracePacket> packet(
3558         &sync_marker_packet_[0], sizeof(sync_marker_packet_));
3559     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3560     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3561 
3562     // Keep this last.
3563     packet->set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
3564     sync_marker_packet_size_ = packet.Finalize();
3565   }
3566   packets->emplace_back();
3567   packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
3568 }
3569 
EmitStats(TracingSession * tracing_session,std::vector<TracePacket> * packets)3570 void TracingServiceImpl::EmitStats(TracingSession* tracing_session,
3571                                    std::vector<TracePacket>* packets) {
3572   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3573   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3574   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3575   GetTraceStats(tracing_session).Serialize(packet->set_trace_stats());
3576   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3577 }
3578 
GetTraceStats(TracingSession * tracing_session)3579 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
3580   TraceStats trace_stats;
3581   trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
3582   trace_stats.set_producers_seen(last_producer_id_);
3583   trace_stats.set_data_sources_registered(
3584       static_cast<uint32_t>(data_sources_.size()));
3585   trace_stats.set_data_sources_seen(last_data_source_instance_id_);
3586   trace_stats.set_tracing_sessions(
3587       static_cast<uint32_t>(tracing_sessions_.size()));
3588   trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
3589   trace_stats.set_chunks_discarded(chunks_discarded_);
3590   trace_stats.set_patches_discarded(patches_discarded_);
3591   trace_stats.set_invalid_packets(tracing_session->invalid_packets);
3592   trace_stats.set_flushes_requested(tracing_session->flushes_requested);
3593   trace_stats.set_flushes_succeeded(tracing_session->flushes_succeeded);
3594   trace_stats.set_flushes_failed(tracing_session->flushes_failed);
3595   trace_stats.set_final_flush_outcome(tracing_session->final_flush_outcome);
3596 
3597   if (tracing_session->trace_filter) {
3598     auto* filt_stats = trace_stats.mutable_filter_stats();
3599     filt_stats->set_input_packets(tracing_session->filter_input_packets);
3600     filt_stats->set_input_bytes(tracing_session->filter_input_bytes);
3601     filt_stats->set_output_bytes(tracing_session->filter_output_bytes);
3602     filt_stats->set_errors(tracing_session->filter_errors);
3603     filt_stats->set_time_taken_ns(tracing_session->filter_time_taken_ns);
3604     for (uint64_t value : tracing_session->filter_bytes_discarded_per_buffer)
3605       filt_stats->add_bytes_discarded_per_buffer(value);
3606   }
3607 
3608   for (BufferID buf_id : tracing_session->buffers_index) {
3609     TraceBuffer* buf = GetBufferByID(buf_id);
3610     if (!buf) {
3611       PERFETTO_DFATAL("Buffer not found.");
3612       continue;
3613     }
3614     *trace_stats.add_buffer_stats() = buf->stats();
3615   }  // for (buf in session).
3616 
3617   if (!tracing_session->config.builtin_data_sources()
3618            .disable_chunk_usage_histograms()) {
3619     // Emit chunk usage stats broken down by sequence ID (i.e. by trace-writer).
3620     // Writer stats are updated by each TraceBuffer object at ReadBuffers time,
3621     // and there can be >1 buffer per session. A trace writer never writes to
3622     // more than one buffer (it's technically allowed but doesn't happen in the
3623     // current impl of the tracing SDK).
3624 
3625     bool has_written_bucket_definition = false;
3626     uint32_t buf_idx = static_cast<uint32_t>(-1);
3627     for (const BufferID buf_id : tracing_session->buffers_index) {
3628       ++buf_idx;
3629       const TraceBuffer* buf = GetBufferByID(buf_id);
3630       if (!buf)
3631         continue;
3632       for (auto it = buf->writer_stats().GetIterator(); it; ++it) {
3633         const auto& hist = it.value().used_chunk_hist;
3634         ProducerID p;
3635         WriterID w;
3636         GetProducerAndWriterID(it.key(), &p, &w);
3637         if (!has_written_bucket_definition) {
3638           // Serialize one-off the histogram bucket definition, which is the
3639           // same for all entries in the map.
3640           has_written_bucket_definition = true;
3641           // The -1 in the loop below is to skip the implicit overflow bucket.
3642           for (size_t i = 0; i < hist.num_buckets() - 1; ++i) {
3643             trace_stats.add_chunk_payload_histogram_def(hist.GetBucketThres(i));
3644           }
3645         }  // if(!has_written_bucket_definition)
3646         auto* wri_stats = trace_stats.add_writer_stats();
3647         wri_stats->set_sequence_id(
3648             tracing_session->GetPacketSequenceID(kDefaultMachineID, p, w));
3649         wri_stats->set_buffer(buf_idx);
3650         for (size_t i = 0; i < hist.num_buckets(); ++i) {
3651           wri_stats->add_chunk_payload_histogram_counts(hist.GetBucketCount(i));
3652           wri_stats->add_chunk_payload_histogram_sum(hist.GetBucketSum(i));
3653         }
3654       }  // for each sequence (writer).
3655     }    // for each buffer.
3656   }      // if (!disable_chunk_usage_histograms)
3657 
3658   return trace_stats;
3659 }
3660 
EmitUuid(TracingSession * tracing_session,std::vector<TracePacket> * packets)3661 void TracingServiceImpl::EmitUuid(TracingSession* tracing_session,
3662                                   std::vector<TracePacket>* packets) {
3663   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3664   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3665   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3666   auto* uuid = packet->set_trace_uuid();
3667   uuid->set_lsb(tracing_session->trace_uuid.lsb());
3668   uuid->set_msb(tracing_session->trace_uuid.msb());
3669   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3670 }
3671 
MaybeEmitTraceConfig(TracingSession * tracing_session,std::vector<TracePacket> * packets)3672 void TracingServiceImpl::MaybeEmitTraceConfig(
3673     TracingSession* tracing_session,
3674     std::vector<TracePacket>* packets) {
3675   if (tracing_session->did_emit_initial_packets)
3676     return;
3677   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3678   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3679   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3680   tracing_session->config.Serialize(packet->set_trace_config());
3681   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3682 }
3683 
EmitSystemInfo(std::vector<TracePacket> * packets)3684 void TracingServiceImpl::EmitSystemInfo(std::vector<TracePacket>* packets) {
3685   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3686   auto* info = packet->set_system_info();
3687   info->set_tracing_service_version(base::GetVersionString());
3688 
3689   std::optional<int32_t> tzoff = base::GetTimezoneOffsetMins();
3690   if (tzoff.has_value())
3691     info->set_timezone_off_mins(*tzoff);
3692 
3693 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
3694     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
3695   struct utsname uname_info;
3696   if (uname(&uname_info) == 0) {
3697     auto* utsname_info = info->set_utsname();
3698     utsname_info->set_sysname(uname_info.sysname);
3699     utsname_info->set_version(uname_info.version);
3700     utsname_info->set_machine(uname_info.machine);
3701     utsname_info->set_release(uname_info.release);
3702   }
3703   info->set_page_size(static_cast<uint32_t>(sysconf(_SC_PAGESIZE)));
3704   info->set_num_cpus(static_cast<uint32_t>(sysconf(_SC_NPROCESSORS_CONF)));
3705 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
3706 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3707   std::string fingerprint_value = base::GetAndroidProp("ro.build.fingerprint");
3708   if (!fingerprint_value.empty()) {
3709     info->set_android_build_fingerprint(fingerprint_value);
3710   } else {
3711     PERFETTO_ELOG("Unable to read ro.build.fingerprint");
3712   }
3713 
3714   std::string device_manufacturer_value =
3715       base::GetAndroidProp("ro.product.manufacturer");
3716   if (!device_manufacturer_value.empty()) {
3717     info->set_android_device_manufacturer(device_manufacturer_value);
3718   } else {
3719     PERFETTO_ELOG("Unable to read ro.product.manufacturer");
3720   }
3721 
3722   std::string sdk_str_value = base::GetAndroidProp("ro.build.version.sdk");
3723   std::optional<uint64_t> sdk_value = base::StringToUInt64(sdk_str_value);
3724   if (sdk_value.has_value()) {
3725     info->set_android_sdk_version(*sdk_value);
3726   } else {
3727     PERFETTO_ELOG("Unable to read ro.build.version.sdk");
3728   }
3729 
3730   std::string soc_model_value = base::GetAndroidProp("ro.soc.model");
3731   if (!soc_model_value.empty()) {
3732     info->set_android_soc_model(soc_model_value);
3733   } else {
3734     PERFETTO_ELOG("Unable to read ro.soc.model");
3735   }
3736 
3737   // guest_soc model is not always present
3738   std::string guest_soc_model_value =
3739       base::GetAndroidProp("ro.boot.guest_soc.model");
3740   if (!guest_soc_model_value.empty()) {
3741     info->set_android_guest_soc_model(guest_soc_model_value);
3742   }
3743 
3744   std::string hw_rev_value = base::GetAndroidProp("ro.boot.hardware.revision");
3745   if (!hw_rev_value.empty()) {
3746     info->set_android_hardware_revision(hw_rev_value);
3747   } else {
3748     PERFETTO_ELOG("Unable to read ro.boot.hardware.revision");
3749   }
3750 
3751   std::string hw_ufs_value = base::GetAndroidProp("ro.boot.hardware.ufs");
3752   if (!hw_ufs_value.empty()) {
3753     info->set_android_storage_model(hw_ufs_value);
3754   } else {
3755     PERFETTO_ELOG("Unable to read ro.boot.hardware.ufs");
3756   }
3757 
3758   std::string hw_ddr_value = base::GetAndroidProp("ro.boot.hardware.ddr");
3759   if (!hw_ddr_value.empty()) {
3760     info->set_android_ram_model(hw_ddr_value);
3761   } else {
3762     PERFETTO_ELOG("Unable to read ro.boot.hardware.ddr");
3763   }
3764 
3765 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3766   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3767   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3768   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3769 }
3770 
EmitLifecycleEvents(TracingSession * tracing_session,std::vector<TracePacket> * packets)3771 void TracingServiceImpl::EmitLifecycleEvents(
3772     TracingSession* tracing_session,
3773     std::vector<TracePacket>* packets) {
3774   using TimestampedPacket =
3775       std::pair<int64_t /* ts */, std::vector<uint8_t> /* serialized packet */>;
3776 
3777   std::vector<TimestampedPacket> timestamped_packets;
3778   for (auto& event : tracing_session->lifecycle_events) {
3779     for (int64_t ts : event.timestamps) {
3780       protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3781       packet->set_timestamp(static_cast<uint64_t>(ts));
3782       packet->set_trusted_uid(static_cast<int32_t>(uid_));
3783       packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3784 
3785       auto* service_event = packet->set_service_event();
3786       service_event->AppendVarInt(event.field_id, 1);
3787       timestamped_packets.emplace_back(ts, packet.SerializeAsArray());
3788     }
3789     event.timestamps.clear();
3790   }
3791 
3792   if (tracing_session->slow_start_event.has_value()) {
3793     const TracingSession::ArbitraryLifecycleEvent& event =
3794         *tracing_session->slow_start_event;
3795     timestamped_packets.emplace_back(event.timestamp, std::move(event.data));
3796   }
3797   tracing_session->slow_start_event.reset();
3798 
3799   for (auto& event : tracing_session->last_flush_events) {
3800     timestamped_packets.emplace_back(event.timestamp, std::move(event.data));
3801   }
3802   tracing_session->last_flush_events.clear();
3803 
3804   // We sort by timestamp here to ensure that the "sequence" of lifecycle
3805   // packets has monotonic timestamps like other sequences in the trace.
3806   // Note that these events could still be out of order with respect to other
3807   // events on the service packet sequence (e.g. trigger received packets).
3808   std::sort(timestamped_packets.begin(), timestamped_packets.end(),
3809             [](const TimestampedPacket& a, const TimestampedPacket& b) {
3810               return a.first < b.first;
3811             });
3812 
3813   for (auto& pair : timestamped_packets)
3814     SerializeAndAppendPacket(packets, std::move(pair.second));
3815 }
3816 
MaybeEmitRemoteClockSync(TracingSession * tracing_session,std::vector<TracePacket> * packets)3817 void TracingServiceImpl::MaybeEmitRemoteClockSync(
3818     TracingSession* tracing_session,
3819     std::vector<TracePacket>* packets) {
3820   if (tracing_session->did_emit_remote_clock_sync_)
3821     return;
3822 
3823   std::unordered_set<MachineID> did_emit_machines;
3824   for (const auto& id_and_relay_client : relay_clients_) {
3825     const auto& relay_client = id_and_relay_client.second;
3826     auto machine_id = relay_client->machine_id();
3827     if (did_emit_machines.find(machine_id) != did_emit_machines.end())
3828       continue;  // Already emitted for the machine (e.g. multiple clients).
3829 
3830     auto& sync_clock_snapshots = relay_client->synced_clocks();
3831     if (sync_clock_snapshots.empty()) {
3832       PERFETTO_DLOG("Clock not synchronized for machine ID = %" PRIu32,
3833                     machine_id);
3834       continue;
3835     }
3836 
3837     // Don't emit twice for the same machine.
3838     did_emit_machines.insert(machine_id);
3839 
3840     protozero::HeapBuffered<protos::pbzero::TracePacket> sync_packet;
3841     sync_packet->set_machine_id(machine_id);
3842     sync_packet->set_trusted_uid(static_cast<int32_t>(uid_));
3843     auto* remote_clock_sync = sync_packet->set_remote_clock_sync();
3844     for (const auto& sync_exchange : relay_client->synced_clocks()) {
3845       auto* sync_exchange_msg = remote_clock_sync->add_synced_clocks();
3846 
3847       auto* client_snapshots = sync_exchange_msg->set_client_clocks();
3848       for (const auto& client_clock : sync_exchange.client_clocks) {
3849         auto* clock = client_snapshots->add_clocks();
3850         clock->set_clock_id(client_clock.clock_id);
3851         clock->set_timestamp(client_clock.timestamp);
3852       }
3853 
3854       auto* host_snapshots = sync_exchange_msg->set_host_clocks();
3855       for (const auto& host_clock : sync_exchange.host_clocks) {
3856         auto* clock = host_snapshots->add_clocks();
3857         clock->set_clock_id(host_clock.clock_id);
3858         clock->set_timestamp(host_clock.timestamp);
3859       }
3860     }
3861 
3862     SerializeAndAppendPacket(packets, sync_packet.SerializeAsArray());
3863   }
3864 
3865   tracing_session->did_emit_remote_clock_sync_ = true;
3866 }
3867 
MaybeEmitCloneTrigger(TracingSession * tracing_session,std::vector<TracePacket> * packets)3868 void TracingServiceImpl::MaybeEmitCloneTrigger(
3869     TracingSession* tracing_session,
3870     std::vector<TracePacket>* packets) {
3871   if (tracing_session->clone_trigger.has_value()) {
3872     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3873     auto* trigger = packet->set_clone_snapshot_trigger();
3874     const auto& info = tracing_session->clone_trigger.value();
3875     trigger->set_trigger_name(info.trigger_name);
3876     trigger->set_producer_name(info.producer_name);
3877     trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
3878 
3879     packet->set_timestamp(info.boot_time_ns);
3880     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3881     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3882     SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3883   }
3884 }
3885 
MaybeEmitReceivedTriggers(TracingSession * tracing_session,std::vector<TracePacket> * packets)3886 void TracingServiceImpl::MaybeEmitReceivedTriggers(
3887     TracingSession* tracing_session,
3888     std::vector<TracePacket>* packets) {
3889   PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
3890                   tracing_session->received_triggers.size());
3891   for (size_t i = tracing_session->num_triggers_emitted_into_trace;
3892        i < tracing_session->received_triggers.size(); ++i) {
3893     const auto& info = tracing_session->received_triggers[i];
3894     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3895     auto* trigger = packet->set_trigger();
3896     trigger->set_trigger_name(info.trigger_name);
3897     trigger->set_producer_name(info.producer_name);
3898     trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
3899 
3900     packet->set_timestamp(info.boot_time_ns);
3901     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3902     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3903     SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3904     ++tracing_session->num_triggers_emitted_into_trace;
3905   }
3906 }
3907 
MaybeLogUploadEvent(const TraceConfig & cfg,const base::Uuid & uuid,PerfettoStatsdAtom atom,const std::string & trigger_name)3908 void TracingServiceImpl::MaybeLogUploadEvent(const TraceConfig& cfg,
3909                                              const base::Uuid& uuid,
3910                                              PerfettoStatsdAtom atom,
3911                                              const std::string& trigger_name) {
3912   if (!ShouldLogEvent(cfg))
3913     return;
3914 
3915   PERFETTO_DCHECK(uuid);  // The UUID must be set at this point.
3916   android_stats::MaybeLogUploadEvent(atom, uuid.lsb(), uuid.msb(),
3917                                      trigger_name);
3918 }
3919 
MaybeLogTriggerEvent(const TraceConfig & cfg,PerfettoTriggerAtom atom,const std::string & trigger_name)3920 void TracingServiceImpl::MaybeLogTriggerEvent(const TraceConfig& cfg,
3921                                               PerfettoTriggerAtom atom,
3922                                               const std::string& trigger_name) {
3923   if (!ShouldLogEvent(cfg))
3924     return;
3925   android_stats::MaybeLogTriggerEvent(atom, trigger_name);
3926 }
3927 
PurgeExpiredAndCountTriggerInWindow(int64_t now_ns,uint64_t trigger_name_hash)3928 size_t TracingServiceImpl::PurgeExpiredAndCountTriggerInWindow(
3929     int64_t now_ns,
3930     uint64_t trigger_name_hash) {
3931   constexpr int64_t kOneDayInNs = 24ll * 60 * 60 * 1000 * 1000 * 1000;
3932   PERFETTO_DCHECK(
3933       std::is_sorted(trigger_history_.begin(), trigger_history_.end()));
3934   size_t remove_count = 0;
3935   size_t trigger_count = 0;
3936   for (const TriggerHistory& h : trigger_history_) {
3937     if (h.timestamp_ns < now_ns - kOneDayInNs) {
3938       remove_count++;
3939     } else if (h.name_hash == trigger_name_hash) {
3940       trigger_count++;
3941     }
3942   }
3943   trigger_history_.erase_front(remove_count);
3944   return trigger_count;
3945 }
3946 
FlushAndCloneSession(ConsumerEndpointImpl * consumer,ConsumerEndpoint::CloneSessionArgs args)3947 base::Status TracingServiceImpl::FlushAndCloneSession(
3948     ConsumerEndpointImpl* consumer,
3949     ConsumerEndpoint::CloneSessionArgs args) {
3950   PERFETTO_DCHECK_THREAD(thread_checker_);
3951   auto clone_target = FlushFlags::CloneTarget::kUnknown;
3952 
3953   TracingSession* session = nullptr;
3954   if (args.for_bugreport) {
3955     clone_target = FlushFlags::CloneTarget::kBugreport;
3956   }
3957   if (args.tsid != 0) {
3958     if (args.tsid == kBugreportSessionId) {
3959       // This branch is only here to support the legacy protocol where we could
3960       // clone only a single session using the magic ID kBugreportSessionId.
3961       // The newer perfetto --clone-all-for-bugreport first queries the existing
3962       // sessions and then issues individual clone requests specifying real
3963       // session IDs, setting args.{for_bugreport,skip_trace_filter}=true.
3964       PERFETTO_LOG("Looking for sessions for bugreport");
3965       session = FindTracingSessionWithMaxBugreportScore();
3966       if (!session) {
3967         return base::ErrStatus(
3968             "No tracing sessions eligible for bugreport found");
3969       }
3970       args.tsid = session->id;
3971       clone_target = FlushFlags::CloneTarget::kBugreport;
3972       args.skip_trace_filter = true;
3973     } else {
3974       session = GetTracingSession(args.tsid);
3975     }
3976   } else if (!args.unique_session_name.empty()) {
3977     session = GetTracingSessionByUniqueName(args.unique_session_name);
3978   }
3979 
3980   if (!session) {
3981     return base::ErrStatus("Tracing session not found");
3982   }
3983 
3984   // Skip the UID check for sessions marked with a bugreport_score > 0.
3985   // Those sessions, by design, can be stolen by any other consumer for the
3986   // sake of creating snapshots for bugreports.
3987   if (!session->IsCloneAllowed(consumer->uid_)) {
3988     return PERFETTO_SVC_ERR("Not allowed to clone a session from another UID");
3989   }
3990 
3991   // If any of the buffers are marked as clear_before_clone, reset them before
3992   // issuing the Flush(kCloneReason).
3993   size_t buf_idx = 0;
3994   for (BufferID src_buf_id : session->buffers_index) {
3995     if (!session->config.buffers()[buf_idx++].clear_before_clone())
3996       continue;
3997     auto buf_iter = buffers_.find(src_buf_id);
3998     PERFETTO_CHECK(buf_iter != buffers_.end());
3999     std::unique_ptr<TraceBuffer>& buf = buf_iter->second;
4000 
4001     // No need to reset the buffer if nothing has been written into it yet.
4002     // This is the canonical case if producers behive nicely and don't timeout
4003     // the handling of writes during the flush.
4004     // This check avoids a useless re-mmap upon every Clone() if the buffer is
4005     // already empty (when used in combination with `transfer_on_clone`).
4006     if (!buf->has_data())
4007       continue;
4008 
4009     // Some leftover data was left in the buffer. Recreate it to empty it.
4010     const auto buf_policy = buf->overwrite_policy();
4011     const auto buf_size = buf->size();
4012     std::unique_ptr<TraceBuffer> old_buf = std::move(buf);
4013     buf = TraceBuffer::Create(buf_size, buf_policy);
4014     if (!buf) {
4015       // This is extremely rare but could happen on 32-bit. If the new buffer
4016       // allocation failed, put back the buffer where it was and fail the clone.
4017       // We cannot leave the original tracing session buffer-less as it would
4018       // cause crashes when data sources commit new data.
4019       buf = std::move(old_buf);
4020       return base::ErrStatus(
4021           "Buffer allocation failed while attempting to clone");
4022     }
4023   }
4024 
4025   auto weak_consumer = consumer->GetWeakPtr();
4026 
4027   const PendingCloneID clone_id = session->last_pending_clone_id_++;
4028 
4029   auto& clone_op = session->pending_clones[clone_id];
4030   clone_op.pending_flush_cnt = 0;
4031   clone_op.buffers =
4032       std::vector<std::unique_ptr<TraceBuffer>>(session->buffers_index.size());
4033   clone_op.weak_consumer = weak_consumer;
4034   clone_op.skip_trace_filter = args.skip_trace_filter;
4035   if (!args.clone_trigger_name.empty()) {
4036     clone_op.clone_trigger = {args.clone_trigger_boot_time_ns,
4037                               args.clone_trigger_name,
4038                               args.clone_trigger_producer_name,
4039                               args.clone_trigger_trusted_producer_uid};
4040   }
4041 
4042   // Issue separate flush requests for separate buffer groups. The buffer marked
4043   // as transfer_on_clone will be flushed and cloned separately: even if they're
4044   // slower (like in the case of Winscope tracing), they will not delay the
4045   // snapshot of the other buffers.
4046   //
4047   // In the future we might want to split the buffer into more groups and maybe
4048   // allow this to be configurable.
4049   std::array<std::set<BufferID>, 2> bufs_groups;
4050   for (size_t i = 0; i < session->buffers_index.size(); i++) {
4051     if (session->config.buffers()[i].transfer_on_clone()) {
4052       bufs_groups[0].insert(session->buffers_index[i]);
4053     } else {
4054       bufs_groups[1].insert(session->buffers_index[i]);
4055     }
4056   }
4057 
4058   SnapshotLifecyleEvent(
4059       session, protos::pbzero::TracingServiceEvent::kFlushStartedFieldNumber,
4060       false /* snapshot_clocks */);
4061   clone_op.pending_flush_cnt = bufs_groups.size();
4062   for (const std::set<BufferID>& buf_group : bufs_groups) {
4063     FlushDataSourceInstances(
4064         session, 0,
4065         GetFlushableDataSourceInstancesForBuffers(session, buf_group),
4066         [tsid = session->id, clone_id, buf_group, this](bool final_flush) {
4067           OnFlushDoneForClone(tsid, clone_id, buf_group, final_flush);
4068         },
4069         FlushFlags(FlushFlags::Initiator::kTraced,
4070                    FlushFlags::Reason::kTraceClone, clone_target));
4071   }
4072 
4073   return base::OkStatus();
4074 }
4075 
4076 std::map<ProducerID, std::vector<DataSourceInstanceID>>
GetFlushableDataSourceInstancesForBuffers(TracingSession * session,const std::set<BufferID> & bufs)4077 TracingServiceImpl::GetFlushableDataSourceInstancesForBuffers(
4078     TracingSession* session,
4079     const std::set<BufferID>& bufs) {
4080   std::map<ProducerID, std::vector<DataSourceInstanceID>> data_source_instances;
4081 
4082   for (const auto& [producer_id, ds_inst] : session->data_source_instances) {
4083     // TODO(ddiproietto): Consider if we should skip instances if ds_inst.state
4084     // != DataSourceInstance::STARTED
4085     if (ds_inst.no_flush) {
4086       continue;
4087     }
4088     if (!bufs.count(static_cast<BufferID>(ds_inst.config.target_buffer()))) {
4089       continue;
4090     }
4091     data_source_instances[producer_id].push_back(ds_inst.instance_id);
4092   }
4093 
4094   return data_source_instances;
4095 }
4096 
OnFlushDoneForClone(TracingSessionID tsid,PendingCloneID clone_id,const std::set<BufferID> & buf_ids,bool final_flush_outcome)4097 void TracingServiceImpl::OnFlushDoneForClone(TracingSessionID tsid,
4098                                              PendingCloneID clone_id,
4099                                              const std::set<BufferID>& buf_ids,
4100                                              bool final_flush_outcome) {
4101   TracingSession* src = GetTracingSession(tsid);
4102   // The session might be gone by the time we try to clone it.
4103   if (!src) {
4104     return;
4105   }
4106 
4107   auto it = src->pending_clones.find(clone_id);
4108   if (it == src->pending_clones.end()) {
4109     return;
4110   }
4111   auto& clone_op = it->second;
4112 
4113   if (final_flush_outcome == false) {
4114     clone_op.flush_failed = true;
4115   }
4116 
4117   base::Status result;
4118   base::Uuid uuid;
4119 
4120   // First clone the flushed TraceBuffer(s). This can fail because of ENOMEM. If
4121   // it happens bail out early before creating any session.
4122   if (!DoCloneBuffers(src, buf_ids, &clone_op.buffers)) {
4123     result = PERFETTO_SVC_ERR("Buffer allocation failed");
4124   }
4125 
4126   if (result.ok()) {
4127     UpdateMemoryGuardrail();
4128 
4129     if (--clone_op.pending_flush_cnt != 0) {
4130       // Wait for more pending flushes.
4131       return;
4132     }
4133 
4134     PERFETTO_LOG("FlushAndCloneSession(%" PRIu64 ") started, success=%d", tsid,
4135                  final_flush_outcome);
4136 
4137     if (clone_op.weak_consumer) {
4138       result = FinishCloneSession(
4139           &*clone_op.weak_consumer, tsid, std::move(clone_op.buffers),
4140           clone_op.skip_trace_filter, !clone_op.flush_failed,
4141           clone_op.clone_trigger, &uuid);
4142     }
4143   }  // if (result.ok())
4144 
4145   if (clone_op.weak_consumer) {
4146     clone_op.weak_consumer->consumer_->OnSessionCloned(
4147         {result.ok(), result.message(), uuid});
4148   }
4149 
4150   src->pending_clones.erase(it);
4151   UpdateMemoryGuardrail();
4152 }
4153 
DoCloneBuffers(TracingSession * src,const std::set<BufferID> & buf_ids,std::vector<std::unique_ptr<TraceBuffer>> * buf_snaps)4154 bool TracingServiceImpl::DoCloneBuffers(
4155     TracingSession* src,
4156     const std::set<BufferID>& buf_ids,
4157     std::vector<std::unique_ptr<TraceBuffer>>* buf_snaps) {
4158   PERFETTO_DCHECK(src->num_buffers() == src->config.buffers().size());
4159   buf_snaps->resize(src->buffers_index.size());
4160 
4161   for (size_t buf_idx = 0; buf_idx < src->buffers_index.size(); buf_idx++) {
4162     BufferID src_buf_id = src->buffers_index[buf_idx];
4163     if (buf_ids.count(src_buf_id) == 0)
4164       continue;
4165     auto buf_iter = buffers_.find(src_buf_id);
4166     PERFETTO_CHECK(buf_iter != buffers_.end());
4167     std::unique_ptr<TraceBuffer>& src_buf = buf_iter->second;
4168     std::unique_ptr<TraceBuffer> new_buf;
4169     if (src->config.buffers()[buf_idx].transfer_on_clone()) {
4170       const auto buf_policy = src_buf->overwrite_policy();
4171       const auto buf_size = src_buf->size();
4172       new_buf = std::move(src_buf);
4173       src_buf = TraceBuffer::Create(buf_size, buf_policy);
4174       if (!src_buf) {
4175         // If the allocation fails put the buffer back and let the code below
4176         // handle the failure gracefully.
4177         src_buf = std::move(new_buf);
4178       }
4179     } else {
4180       new_buf = src_buf->CloneReadOnly();
4181     }
4182     if (!new_buf.get()) {
4183       return false;
4184     }
4185     (*buf_snaps)[buf_idx] = std::move(new_buf);
4186   }
4187   return true;
4188 }
4189 
FinishCloneSession(ConsumerEndpointImpl * consumer,TracingSessionID src_tsid,std::vector<std::unique_ptr<TraceBuffer>> buf_snaps,bool skip_trace_filter,bool final_flush_outcome,std::optional<TriggerInfo> clone_trigger,base::Uuid * new_uuid)4190 base::Status TracingServiceImpl::FinishCloneSession(
4191     ConsumerEndpointImpl* consumer,
4192     TracingSessionID src_tsid,
4193     std::vector<std::unique_ptr<TraceBuffer>> buf_snaps,
4194     bool skip_trace_filter,
4195     bool final_flush_outcome,
4196     std::optional<TriggerInfo> clone_trigger,
4197     base::Uuid* new_uuid) {
4198   PERFETTO_DLOG("CloneSession(%" PRIu64
4199                 ", skip_trace_filter=%d) started, consumer uid: %d",
4200                 src_tsid, skip_trace_filter, static_cast<int>(consumer->uid_));
4201 
4202   TracingSession* src = GetTracingSession(src_tsid);
4203 
4204   // The session might be gone by the time we try to clone it.
4205   if (!src)
4206     return PERFETTO_SVC_ERR("session not found");
4207 
4208   if (consumer->tracing_session_id_) {
4209     return PERFETTO_SVC_ERR(
4210         "The consumer is already attached to another tracing session");
4211   }
4212 
4213   std::vector<BufferID> buf_ids =
4214       buffer_ids_.AllocateMultiple(buf_snaps.size());
4215   if (buf_ids.size() != buf_snaps.size()) {
4216     return PERFETTO_SVC_ERR("Buffer id allocation failed");
4217   }
4218 
4219   PERFETTO_CHECK(std::none_of(
4220       buf_snaps.begin(), buf_snaps.end(),
4221       [](const std::unique_ptr<TraceBuffer>& buf) { return buf == nullptr; }));
4222 
4223   const TracingSessionID tsid = ++last_tracing_session_id_;
4224   TracingSession* cloned_session =
4225       &tracing_sessions_
4226            .emplace(std::piecewise_construct, std::forward_as_tuple(tsid),
4227                     std::forward_as_tuple(tsid, consumer, src->config,
4228                                           weak_runner_.task_runner()))
4229            .first->second;
4230 
4231   // Generate a new UUID for the cloned session, but preserve the LSB. In some
4232   // contexts the LSB is used to tie the trace back to the statsd subscription
4233   // that triggered it. See the corresponding code in perfetto_cmd.cc which
4234   // reads at triggering_subscription_id().
4235   const int64_t orig_uuid_lsb = src->trace_uuid.lsb();
4236   cloned_session->state = TracingSession::CLONED_READ_ONLY;
4237   cloned_session->trace_uuid = base::Uuidv4();
4238   cloned_session->trace_uuid.set_lsb(orig_uuid_lsb);
4239   *new_uuid = cloned_session->trace_uuid;
4240 
4241   for (size_t i = 0; i < buf_snaps.size(); i++) {
4242     BufferID buf_global_id = buf_ids[i];
4243     std::unique_ptr<TraceBuffer>& buf = buf_snaps[i];
4244     // This is only needed for transfer_on_clone. Other buffers are already
4245     // marked as read-only by CloneReadOnly(). We cannot do this early because
4246     // in case of an allocation failure we will put std::move() the original
4247     // buffer back in its place and in that case should not be made read-only.
4248     buf->set_read_only();
4249     buffers_.emplace(buf_global_id, std::move(buf));
4250     cloned_session->buffers_index.emplace_back(buf_global_id);
4251   }
4252   UpdateMemoryGuardrail();
4253 
4254   // Copy over relevant state that we want to persist in the cloned session.
4255   // Mostly stats and metadata that is emitted in the trace file by the service.
4256   // Also clear the received trigger list in the main tracing session. A
4257   // CLONE_SNAPSHOT session can go in ring buffer mode for several hours and get
4258   // snapshotted several times. This causes two issues with `received_triggers`:
4259   // 1. Adding noise in the cloned trace emitting triggers that happened too
4260   //    far back (see b/290799105).
4261   // 2. Bloating memory (see b/290798988).
4262   cloned_session->should_emit_stats = true;
4263   cloned_session->clone_trigger = clone_trigger;
4264   cloned_session->received_triggers = std::move(src->received_triggers);
4265   src->received_triggers.clear();
4266   src->num_triggers_emitted_into_trace = 0;
4267   cloned_session->lifecycle_events =
4268       std::vector<TracingSession::LifecycleEvent>(src->lifecycle_events);
4269   cloned_session->slow_start_event = src->slow_start_event;
4270   cloned_session->last_flush_events = src->last_flush_events;
4271   cloned_session->initial_clock_snapshot = src->initial_clock_snapshot;
4272   cloned_session->clock_snapshot_ring_buffer = src->clock_snapshot_ring_buffer;
4273   cloned_session->invalid_packets = src->invalid_packets;
4274   cloned_session->flushes_requested = src->flushes_requested;
4275   cloned_session->flushes_succeeded = src->flushes_succeeded;
4276   cloned_session->flushes_failed = src->flushes_failed;
4277   cloned_session->compress_deflate = src->compress_deflate;
4278   if (src->trace_filter && !skip_trace_filter) {
4279     // Copy the trace filter, unless it's a clone-for-bugreport (b/317065412).
4280     cloned_session->trace_filter.reset(
4281         new protozero::MessageFilter(src->trace_filter->config()));
4282   }
4283 
4284   SnapshotLifecyleEvent(
4285       cloned_session,
4286       protos::pbzero::TracingServiceEvent::kTracingDisabledFieldNumber,
4287       true /* snapshot_clocks */);
4288 
4289   PERFETTO_DLOG("Consumer (uid:%d) cloned tracing session %" PRIu64
4290                 " -> %" PRIu64,
4291                 static_cast<int>(consumer->uid_), src_tsid, tsid);
4292 
4293   consumer->tracing_session_id_ = tsid;
4294   cloned_session->final_flush_outcome = final_flush_outcome
4295                                             ? TraceStats::FINAL_FLUSH_SUCCEEDED
4296                                             : TraceStats::FINAL_FLUSH_FAILED;
4297   return base::OkStatus();
4298 }
4299 
IsCloneAllowed(uid_t clone_uid) const4300 bool TracingServiceImpl::TracingSession::IsCloneAllowed(uid_t clone_uid) const {
4301   if (clone_uid == 0)
4302     return true;  // Root is always allowed to clone everything.
4303   if (clone_uid == this->consumer_uid)
4304     return true;  // Allow cloning if the uids match.
4305 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
4306   // On Android allow shell to clone sessions marked as exported for bugreport.
4307   // Dumpstate (invoked by adb bugreport) invokes commands as shell.
4308   if (clone_uid == AID_SHELL && this->config.bugreport_score() > 0)
4309     return true;
4310 #endif
4311   return false;
4312 }
4313 
4314 ////////////////////////////////////////////////////////////////////////////////
4315 // TracingServiceImpl::ConsumerEndpointImpl implementation
4316 ////////////////////////////////////////////////////////////////////////////////
4317 
ConsumerEndpointImpl(TracingServiceImpl * service,base::TaskRunner * task_runner,Consumer * consumer,uid_t uid)4318 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
4319     TracingServiceImpl* service,
4320     base::TaskRunner* task_runner,
4321     Consumer* consumer,
4322     uid_t uid)
4323     : task_runner_(task_runner),
4324       service_(service),
4325       consumer_(consumer),
4326       uid_(uid),
4327       weak_ptr_factory_(this) {}
4328 
~ConsumerEndpointImpl()4329 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
4330   service_->DisconnectConsumer(this);
4331   consumer_->OnDisconnect();
4332 }
4333 
NotifyOnTracingDisabled(const std::string & error)4334 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled(
4335     const std::string& error) {
4336   PERFETTO_DCHECK_THREAD(thread_checker_);
4337   task_runner_->PostTask([weak_this = weak_ptr_factory_.GetWeakPtr(),
4338                           error /* deliberate copy */] {
4339     if (weak_this)
4340       weak_this->consumer_->OnTracingDisabled(error);
4341   });
4342 }
4343 
EnableTracing(const TraceConfig & cfg,base::ScopedFile fd)4344 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
4345     const TraceConfig& cfg,
4346     base::ScopedFile fd) {
4347   PERFETTO_DCHECK_THREAD(thread_checker_);
4348   auto status = service_->EnableTracing(this, cfg, std::move(fd));
4349   if (!status.ok())
4350     NotifyOnTracingDisabled(status.message());
4351 }
4352 
ChangeTraceConfig(const TraceConfig & cfg)4353 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
4354     const TraceConfig& cfg) {
4355   if (!tracing_session_id_) {
4356     PERFETTO_LOG(
4357         "Consumer called ChangeTraceConfig() but tracing was "
4358         "not active");
4359     return;
4360   }
4361   service_->ChangeTraceConfig(this, cfg);
4362 }
4363 
StartTracing()4364 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
4365   PERFETTO_DCHECK_THREAD(thread_checker_);
4366   if (!tracing_session_id_) {
4367     PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
4368     return;
4369   }
4370   service_->StartTracing(tracing_session_id_);
4371 }
4372 
DisableTracing()4373 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
4374   PERFETTO_DCHECK_THREAD(thread_checker_);
4375   if (!tracing_session_id_) {
4376     PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
4377     return;
4378   }
4379   service_->DisableTracing(tracing_session_id_);
4380 }
4381 
ReadBuffers()4382 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
4383   PERFETTO_DCHECK_THREAD(thread_checker_);
4384   if (!tracing_session_id_) {
4385     PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
4386     consumer_->OnTraceData({}, /* has_more = */ false);
4387     return;
4388   }
4389   if (!service_->ReadBuffersIntoConsumer(tracing_session_id_, this)) {
4390     consumer_->OnTraceData({}, /* has_more = */ false);
4391   }
4392 }
4393 
FreeBuffers()4394 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
4395   PERFETTO_DCHECK_THREAD(thread_checker_);
4396   if (!tracing_session_id_) {
4397     PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
4398     return;
4399   }
4400   service_->FreeBuffers(tracing_session_id_);
4401   tracing_session_id_ = 0;
4402 }
4403 
Flush(uint32_t timeout_ms,FlushCallback callback,FlushFlags flush_flags)4404 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
4405                                                      FlushCallback callback,
4406                                                      FlushFlags flush_flags) {
4407   PERFETTO_DCHECK_THREAD(thread_checker_);
4408   if (!tracing_session_id_) {
4409     PERFETTO_LOG("Consumer called Flush() but tracing was not active");
4410     return;
4411   }
4412   service_->Flush(tracing_session_id_, timeout_ms, callback, flush_flags);
4413 }
4414 
Detach(const std::string & key)4415 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
4416   PERFETTO_DCHECK_THREAD(thread_checker_);
4417   bool success = service_->DetachConsumer(this, key);
4418   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4419   task_runner_->PostTask([weak_this = std::move(weak_this), success] {
4420     if (weak_this)
4421       weak_this->consumer_->OnDetach(success);
4422   });
4423 }
4424 
Attach(const std::string & key)4425 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
4426   PERFETTO_DCHECK_THREAD(thread_checker_);
4427   bool success = service_->AttachConsumer(this, key);
4428   task_runner_->PostTask([weak_this = weak_ptr_factory_.GetWeakPtr(), success] {
4429     if (!weak_this)
4430       return;
4431     Consumer* consumer = weak_this->consumer_;
4432     TracingSession* session =
4433         weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
4434     if (!session) {
4435       consumer->OnAttach(false, TraceConfig());
4436       return;
4437     }
4438     consumer->OnAttach(success, session->config);
4439   });
4440 }
4441 
GetTraceStats()4442 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
4443   PERFETTO_DCHECK_THREAD(thread_checker_);
4444   bool success = false;
4445   TraceStats stats;
4446   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
4447   if (session) {
4448     success = true;
4449     stats = service_->GetTraceStats(session);
4450   }
4451   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4452   task_runner_->PostTask(
4453       [weak_this = std::move(weak_this), success, stats = std::move(stats)] {
4454         if (weak_this)
4455           weak_this->consumer_->OnTraceStats(success, stats);
4456       });
4457 }
4458 
ObserveEvents(uint32_t events_mask)4459 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
4460     uint32_t events_mask) {
4461   PERFETTO_DCHECK_THREAD(thread_checker_);
4462   observable_events_mask_ = events_mask;
4463   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
4464   if (!session)
4465     return;
4466 
4467   if (observable_events_mask_ & ObservableEvents::TYPE_DATA_SOURCES_INSTANCES) {
4468     // Issue initial states.
4469     for (const auto& kv : session->data_source_instances) {
4470       ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
4471       PERFETTO_DCHECK(producer);
4472       OnDataSourceInstanceStateChange(*producer, kv.second);
4473     }
4474   }
4475 
4476   // If the ObserveEvents() call happens after data sources have acked already
4477   // notify immediately.
4478   if (observable_events_mask_ &
4479       ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED) {
4480     service_->MaybeNotifyAllDataSourcesStarted(session);
4481   }
4482 }
4483 
OnDataSourceInstanceStateChange(const ProducerEndpointImpl & producer,const DataSourceInstance & instance)4484 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
4485     const ProducerEndpointImpl& producer,
4486     const DataSourceInstance& instance) {
4487   if (!(observable_events_mask_ &
4488         ObservableEvents::TYPE_DATA_SOURCES_INSTANCES)) {
4489     return;
4490   }
4491 
4492   if (instance.state != DataSourceInstance::CONFIGURED &&
4493       instance.state != DataSourceInstance::STARTED &&
4494       instance.state != DataSourceInstance::STOPPED) {
4495     return;
4496   }
4497 
4498   auto* observable_events = AddObservableEvents();
4499   auto* change = observable_events->add_instance_state_changes();
4500   change->set_producer_name(producer.name_);
4501   change->set_data_source_name(instance.data_source_name);
4502   if (instance.state == DataSourceInstance::STARTED) {
4503     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED);
4504   } else {
4505     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STOPPED);
4506   }
4507 }
4508 
OnAllDataSourcesStarted()4509 void TracingServiceImpl::ConsumerEndpointImpl::OnAllDataSourcesStarted() {
4510   if (!(observable_events_mask_ &
4511         ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED)) {
4512     return;
4513   }
4514   auto* observable_events = AddObservableEvents();
4515   observable_events->set_all_data_sources_started(true);
4516 }
4517 
NotifyCloneSnapshotTrigger(const TriggerInfo & trigger)4518 void TracingServiceImpl::ConsumerEndpointImpl::NotifyCloneSnapshotTrigger(
4519     const TriggerInfo& trigger) {
4520   if (!(observable_events_mask_ & ObservableEvents::TYPE_CLONE_TRIGGER_HIT)) {
4521     return;
4522   }
4523   auto* observable_events = AddObservableEvents();
4524   auto* clone_trig = observable_events->mutable_clone_trigger_hit();
4525   clone_trig->set_tracing_session_id(static_cast<int64_t>(tracing_session_id_));
4526   clone_trig->set_trigger_name(trigger.trigger_name);
4527   clone_trig->set_producer_name(trigger.producer_name);
4528   clone_trig->set_producer_uid(trigger.producer_uid);
4529   clone_trig->set_boot_time_ns(trigger.boot_time_ns);
4530 }
4531 
4532 ObservableEvents*
AddObservableEvents()4533 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
4534   PERFETTO_DCHECK_THREAD(thread_checker_);
4535   if (!observable_events_) {
4536     observable_events_.reset(new ObservableEvents());
4537     task_runner_->PostTask([weak_this = weak_ptr_factory_.GetWeakPtr()] {
4538       if (!weak_this)
4539         return;
4540 
4541       // Move into a temporary to allow reentrancy in OnObservableEvents.
4542       auto observable_events = std::move(weak_this->observable_events_);
4543       weak_this->consumer_->OnObservableEvents(*observable_events);
4544     });
4545   }
4546   return observable_events_.get();
4547 }
4548 
QueryServiceState(QueryServiceStateArgs args,QueryServiceStateCallback callback)4549 void TracingServiceImpl::ConsumerEndpointImpl::QueryServiceState(
4550     QueryServiceStateArgs args,
4551     QueryServiceStateCallback callback) {
4552   PERFETTO_DCHECK_THREAD(thread_checker_);
4553   TracingServiceState svc_state;
4554 
4555   const auto& sessions = service_->tracing_sessions_;
4556   svc_state.set_tracing_service_version(base::GetVersionString());
4557   svc_state.set_num_sessions(static_cast<int>(sessions.size()));
4558 
4559   int num_started = 0;
4560   for (const auto& kv : sessions)
4561     num_started += kv.second.state == TracingSession::State::STARTED ? 1 : 0;
4562   svc_state.set_num_sessions_started(num_started);
4563 
4564   for (const auto& kv : service_->producers_) {
4565     if (args.sessions_only)
4566       break;
4567     auto* producer = svc_state.add_producers();
4568     producer->set_id(static_cast<int>(kv.first));
4569     producer->set_name(kv.second->name_);
4570     producer->set_sdk_version(kv.second->sdk_version_);
4571     producer->set_uid(static_cast<int32_t>(kv.second->uid()));
4572     producer->set_pid(static_cast<int32_t>(kv.second->pid()));
4573     producer->set_frozen(kv.second->IsAndroidProcessFrozen());
4574   }
4575 
4576   for (const auto& kv : service_->data_sources_) {
4577     if (args.sessions_only)
4578       break;
4579     const auto& registered_data_source = kv.second;
4580     auto* data_source = svc_state.add_data_sources();
4581     *data_source->mutable_ds_descriptor() = registered_data_source.descriptor;
4582     data_source->set_producer_id(
4583         static_cast<int>(registered_data_source.producer_id));
4584   }
4585 
4586   svc_state.set_supports_tracing_sessions(true);
4587   for (const auto& kv : service_->tracing_sessions_) {
4588     const TracingSession& s = kv.second;
4589     if (!s.IsCloneAllowed(uid_))
4590       continue;
4591     auto* session = svc_state.add_tracing_sessions();
4592     session->set_id(s.id);
4593     session->set_consumer_uid(static_cast<int>(s.consumer_uid));
4594     session->set_duration_ms(s.config.duration_ms());
4595     session->set_num_data_sources(
4596         static_cast<uint32_t>(s.data_source_instances.size()));
4597     session->set_unique_session_name(s.config.unique_session_name());
4598     if (s.config.has_bugreport_score())
4599       session->set_bugreport_score(s.config.bugreport_score());
4600     if (s.config.has_bugreport_filename())
4601       session->set_bugreport_filename(s.config.bugreport_filename());
4602     for (const auto& snap_kv : s.initial_clock_snapshot) {
4603       if (snap_kv.clock_id == protos::pbzero::BUILTIN_CLOCK_REALTIME)
4604         session->set_start_realtime_ns(static_cast<int64_t>(snap_kv.timestamp));
4605     }
4606     for (const auto& buf : s.config.buffers())
4607       session->add_buffer_size_kb(buf.size_kb());
4608 
4609     switch (s.state) {
4610       case TracingSession::State::DISABLED:
4611         session->set_state("DISABLED");
4612         break;
4613       case TracingSession::State::CONFIGURED:
4614         session->set_state("CONFIGURED");
4615         break;
4616       case TracingSession::State::STARTED:
4617         session->set_is_started(true);
4618         session->set_state("STARTED");
4619         break;
4620       case TracingSession::State::DISABLING_WAITING_STOP_ACKS:
4621         session->set_state("STOP_WAIT");
4622         break;
4623       case TracingSession::State::CLONED_READ_ONLY:
4624         session->set_state("CLONED_READ_ONLY");
4625         break;
4626     }
4627   }
4628   callback(/*success=*/true, svc_state);
4629 }
4630 
QueryCapabilities(QueryCapabilitiesCallback callback)4631 void TracingServiceImpl::ConsumerEndpointImpl::QueryCapabilities(
4632     QueryCapabilitiesCallback callback) {
4633   PERFETTO_DCHECK_THREAD(thread_checker_);
4634   TracingServiceCapabilities caps;
4635   caps.set_has_query_capabilities(true);
4636   caps.set_has_trace_config_output_path(true);
4637   caps.set_has_clone_session(true);
4638   caps.add_observable_events(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
4639   caps.add_observable_events(ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
4640   caps.add_observable_events(ObservableEvents::TYPE_CLONE_TRIGGER_HIT);
4641   static_assert(
4642       ObservableEvents::Type_MAX == ObservableEvents::TYPE_CLONE_TRIGGER_HIT,
4643       "");
4644   callback(caps);
4645 }
4646 
SaveTraceForBugreport(SaveTraceForBugreportCallback consumer_callback)4647 void TracingServiceImpl::ConsumerEndpointImpl::SaveTraceForBugreport(
4648     SaveTraceForBugreportCallback consumer_callback) {
4649   consumer_callback(false,
4650                     "SaveTraceForBugreport is deprecated. Use "
4651                     "CloneSession(kBugreportSessionId) instead.");
4652 }
4653 
CloneSession(CloneSessionArgs args)4654 void TracingServiceImpl::ConsumerEndpointImpl::CloneSession(
4655     CloneSessionArgs args) {
4656   PERFETTO_DCHECK_THREAD(thread_checker_);
4657   // FlushAndCloneSession will call OnSessionCloned after the async flush.
4658   base::Status result = service_->FlushAndCloneSession(this, std::move(args));
4659 
4660   if (!result.ok()) {
4661     consumer_->OnSessionCloned({false, result.message(), {}});
4662   }
4663 }
4664 
4665 ////////////////////////////////////////////////////////////////////////////////
4666 // TracingServiceImpl::ProducerEndpointImpl implementation
4667 ////////////////////////////////////////////////////////////////////////////////
4668 
ProducerEndpointImpl(ProducerID id,const ClientIdentity & client_identity,TracingServiceImpl * service,base::TaskRunner * task_runner,Producer * producer,const std::string & producer_name,const std::string & sdk_version,bool in_process,bool smb_scraping_enabled)4669 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
4670     ProducerID id,
4671     const ClientIdentity& client_identity,
4672     TracingServiceImpl* service,
4673     base::TaskRunner* task_runner,
4674     Producer* producer,
4675     const std::string& producer_name,
4676     const std::string& sdk_version,
4677     bool in_process,
4678     bool smb_scraping_enabled)
4679     : id_(id),
4680       client_identity_(client_identity),
4681       service_(service),
4682       producer_(producer),
4683       name_(producer_name),
4684       sdk_version_(sdk_version),
4685       in_process_(in_process),
4686       smb_scraping_enabled_(smb_scraping_enabled),
4687       weak_runner_(task_runner) {}
4688 
~ProducerEndpointImpl()4689 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
4690   service_->DisconnectProducer(id_);
4691   producer_->OnDisconnect();
4692 }
4693 
Disconnect()4694 void TracingServiceImpl::ProducerEndpointImpl::Disconnect() {
4695   PERFETTO_DCHECK_THREAD(thread_checker_);
4696   // Disconnection is only supported via destroying the ProducerEndpoint.
4697   PERFETTO_FATAL("Not supported");
4698 }
4699 
RegisterDataSource(const DataSourceDescriptor & desc)4700 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
4701     const DataSourceDescriptor& desc) {
4702   PERFETTO_DCHECK_THREAD(thread_checker_);
4703   service_->RegisterDataSource(id_, desc);
4704 }
4705 
UpdateDataSource(const DataSourceDescriptor & desc)4706 void TracingServiceImpl::ProducerEndpointImpl::UpdateDataSource(
4707     const DataSourceDescriptor& desc) {
4708   PERFETTO_DCHECK_THREAD(thread_checker_);
4709   service_->UpdateDataSource(id_, desc);
4710 }
4711 
UnregisterDataSource(const std::string & name)4712 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
4713     const std::string& name) {
4714   PERFETTO_DCHECK_THREAD(thread_checker_);
4715   service_->UnregisterDataSource(id_, name);
4716 }
4717 
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)4718 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
4719     uint32_t writer_id,
4720     uint32_t target_buffer) {
4721   PERFETTO_DCHECK_THREAD(thread_checker_);
4722   writers_[static_cast<WriterID>(writer_id)] =
4723       static_cast<BufferID>(target_buffer);
4724 }
4725 
UnregisterTraceWriter(uint32_t writer_id)4726 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
4727     uint32_t writer_id) {
4728   PERFETTO_DCHECK_THREAD(thread_checker_);
4729   writers_.erase(static_cast<WriterID>(writer_id));
4730 }
4731 
CommitData(const CommitDataRequest & req_untrusted,CommitDataCallback callback)4732 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
4733     const CommitDataRequest& req_untrusted,
4734     CommitDataCallback callback) {
4735   PERFETTO_DCHECK_THREAD(thread_checker_);
4736 
4737   if (metatrace::IsEnabled(metatrace::TAG_TRACE_SERVICE)) {
4738     PERFETTO_METATRACE_COUNTER(TAG_TRACE_SERVICE, TRACE_SERVICE_COMMIT_DATA,
4739                                EncodeCommitDataRequest(id_, req_untrusted));
4740   }
4741 
4742   if (!shared_memory_) {
4743     PERFETTO_DLOG(
4744         "Attempted to commit data before the shared memory was allocated.");
4745     return;
4746   }
4747   PERFETTO_DCHECK(shmem_abi_.is_valid());
4748   for (const auto& entry : req_untrusted.chunks_to_move()) {
4749     const uint32_t page_idx = entry.page();
4750     if (page_idx >= shmem_abi_.num_pages())
4751       continue;  // A buggy or malicious producer.
4752 
4753     SharedMemoryABI::Chunk chunk;
4754     bool commit_data_over_ipc = entry.has_data();
4755     if (PERFETTO_UNLIKELY(commit_data_over_ipc)) {
4756       // Chunk data is passed over the wire. Create a chunk using the serialized
4757       // protobuf message.
4758       const std::string& data = entry.data();
4759       if (data.size() > SharedMemoryABI::Chunk::kMaxSize) {
4760         PERFETTO_DFATAL("IPC data commit too large: %zu", data.size());
4761         continue;  // A malicious or buggy producer
4762       }
4763       // |data| is not altered, but we need to const_cast becasue Chunk data
4764       // members are non-const.
4765       chunk = SharedMemoryABI::MakeChunkFromSerializedData(
4766           reinterpret_cast<uint8_t*>(const_cast<char*>(data.data())),
4767           static_cast<uint16_t>(entry.data().size()),
4768           static_cast<uint8_t>(entry.chunk()));
4769     } else
4770       chunk = shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
4771     if (!chunk.is_valid()) {
4772       PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
4773                     entry.page(), entry.chunk());
4774       continue;
4775     }
4776 
4777     // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
4778     // the ABI contract expects the producer to not touch the chunk anymore
4779     // (until the service marks that as free). This is why all the reads below
4780     // are just memory_order_relaxed. Also, the code here assumes that all this
4781     // data can be malicious and just gives up if anything is malformed.
4782     BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
4783     const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
4784     WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
4785     ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
4786     auto packets = chunk_header.packets.load(std::memory_order_relaxed);
4787     uint16_t num_fragments = packets.count;
4788     uint8_t chunk_flags = packets.flags;
4789 
4790     service_->CopyProducerPageIntoLogBuffer(
4791         id_, client_identity_, writer_id, chunk_id, buffer_id, num_fragments,
4792         chunk_flags,
4793         /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
4794 
4795     if (!commit_data_over_ipc) {
4796       // This one has release-store semantics.
4797       shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
4798     }
4799   }  // for(chunks_to_move)
4800 
4801   service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
4802 
4803   if (req_untrusted.flush_request_id()) {
4804     service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
4805   }
4806 
4807   // Keep this invocation last. ProducerIPCService::CommitData() relies on this
4808   // callback being invoked within the same callstack and not posted. If this
4809   // changes, the code there needs to be changed accordingly.
4810   if (callback)
4811     callback();
4812 }
4813 
SetupSharedMemory(std::unique_ptr<SharedMemory> shared_memory,size_t page_size_bytes,bool provided_by_producer)4814 void TracingServiceImpl::ProducerEndpointImpl::SetupSharedMemory(
4815     std::unique_ptr<SharedMemory> shared_memory,
4816     size_t page_size_bytes,
4817     bool provided_by_producer) {
4818   PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
4819   PERFETTO_DCHECK(page_size_bytes % 1024 == 0);
4820 
4821   shared_memory_ = std::move(shared_memory);
4822   shared_buffer_page_size_kb_ = page_size_bytes / 1024;
4823   is_shmem_provided_by_producer_ = provided_by_producer;
4824 
4825   shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
4826                         shared_memory_->size(),
4827                         shared_buffer_page_size_kb() * 1024,
4828                         SharedMemoryABI::ShmemMode::kDefault);
4829   if (in_process_) {
4830     inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
4831         shared_memory_->start(), shared_memory_->size(),
4832         SharedMemoryABI::ShmemMode::kDefault,
4833         shared_buffer_page_size_kb_ * 1024, this, weak_runner_.task_runner()));
4834     inproc_shmem_arbiter_->SetDirectSMBPatchingSupportedByService();
4835   }
4836 
4837   OnTracingSetup();
4838   service_->UpdateMemoryGuardrail();
4839 }
4840 
shared_memory() const4841 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
4842   PERFETTO_DCHECK_THREAD(thread_checker_);
4843   return shared_memory_.get();
4844 }
4845 
shared_buffer_page_size_kb() const4846 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
4847     const {
4848   return shared_buffer_page_size_kb_;
4849 }
4850 
ActivateTriggers(const std::vector<std::string> & triggers)4851 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
4852     const std::vector<std::string>& triggers) {
4853   service_->ActivateTriggers(id_, triggers);
4854 }
4855 
StopDataSource(DataSourceInstanceID ds_inst_id)4856 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
4857     DataSourceInstanceID ds_inst_id) {
4858   // TODO(primiano): When we'll support tearing down the SMB, at this point we
4859   // should send the Producer a TearDownTracing if all its data sources have
4860   // been disabled (see b/77532839 and aosp/655179 PS1).
4861   PERFETTO_DCHECK_THREAD(thread_checker_);
4862   weak_runner_.PostTask(
4863       [this, ds_inst_id] { producer_->StopDataSource(ds_inst_id); });
4864 }
4865 
4866 SharedMemoryArbiter*
MaybeSharedMemoryArbiter()4867 TracingServiceImpl::ProducerEndpointImpl::MaybeSharedMemoryArbiter() {
4868   if (!inproc_shmem_arbiter_) {
4869     PERFETTO_FATAL(
4870         "The in-process SharedMemoryArbiter can only be used when "
4871         "CreateProducer has been called with in_process=true and after tracing "
4872         "has started.");
4873   }
4874 
4875   PERFETTO_DCHECK(in_process_);
4876   return inproc_shmem_arbiter_.get();
4877 }
4878 
IsShmemProvidedByProducer() const4879 bool TracingServiceImpl::ProducerEndpointImpl::IsShmemProvidedByProducer()
4880     const {
4881   return is_shmem_provided_by_producer_;
4882 }
4883 
4884 // Can be called on any thread.
4885 std::unique_ptr<TraceWriter>
CreateTraceWriter(BufferID buf_id,BufferExhaustedPolicy buffer_exhausted_policy)4886 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(
4887     BufferID buf_id,
4888     BufferExhaustedPolicy buffer_exhausted_policy) {
4889   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4890   return MaybeSharedMemoryArbiter()->CreateTraceWriter(buf_id,
4891                                                        buffer_exhausted_policy);
4892 }
4893 
NotifyFlushComplete(FlushRequestID id)4894 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
4895     FlushRequestID id) {
4896   PERFETTO_DCHECK_THREAD(thread_checker_);
4897   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4898   return MaybeSharedMemoryArbiter()->NotifyFlushComplete(id);
4899 }
4900 
OnTracingSetup()4901 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
4902   weak_runner_.PostTask([this] { producer_->OnTracingSetup(); });
4903 }
4904 
Flush(FlushRequestID flush_request_id,const std::vector<DataSourceInstanceID> & data_sources,FlushFlags flush_flags)4905 void TracingServiceImpl::ProducerEndpointImpl::Flush(
4906     FlushRequestID flush_request_id,
4907     const std::vector<DataSourceInstanceID>& data_sources,
4908     FlushFlags flush_flags) {
4909   PERFETTO_DCHECK_THREAD(thread_checker_);
4910   weak_runner_.PostTask([this, flush_request_id, data_sources, flush_flags] {
4911     producer_->Flush(flush_request_id, data_sources.data(), data_sources.size(),
4912                      flush_flags);
4913   });
4914 }
4915 
SetupDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4916 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
4917     DataSourceInstanceID ds_id,
4918     const DataSourceConfig& config) {
4919   PERFETTO_DCHECK_THREAD(thread_checker_);
4920   allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
4921   weak_runner_.PostTask([this, ds_id, config] {
4922     producer_->SetupDataSource(ds_id, std::move(config));
4923   });
4924 }
4925 
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4926 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
4927     DataSourceInstanceID ds_id,
4928     const DataSourceConfig& config) {
4929   PERFETTO_DCHECK_THREAD(thread_checker_);
4930   weak_runner_.PostTask([this, ds_id, config] {
4931     producer_->StartDataSource(ds_id, std::move(config));
4932   });
4933 }
4934 
NotifyDataSourceStarted(DataSourceInstanceID data_source_id)4935 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
4936     DataSourceInstanceID data_source_id) {
4937   PERFETTO_DCHECK_THREAD(thread_checker_);
4938   service_->NotifyDataSourceStarted(id_, data_source_id);
4939 }
4940 
NotifyDataSourceStopped(DataSourceInstanceID data_source_id)4941 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
4942     DataSourceInstanceID data_source_id) {
4943   PERFETTO_DCHECK_THREAD(thread_checker_);
4944   service_->NotifyDataSourceStopped(id_, data_source_id);
4945 }
4946 
OnFreeBuffers(const std::vector<BufferID> & target_buffers)4947 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
4948     const std::vector<BufferID>& target_buffers) {
4949   if (allowed_target_buffers_.empty())
4950     return;
4951   for (BufferID buffer : target_buffers)
4952     allowed_target_buffers_.erase(buffer);
4953 }
4954 
ClearIncrementalState(const std::vector<DataSourceInstanceID> & data_sources)4955 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
4956     const std::vector<DataSourceInstanceID>& data_sources) {
4957   PERFETTO_DCHECK_THREAD(thread_checker_);
4958   weak_runner_.PostTask([this, data_sources] {
4959     base::StringView producer_name(name_);
4960     producer_->ClearIncrementalState(data_sources.data(), data_sources.size());
4961   });
4962 }
4963 
Sync(std::function<void ()> callback)4964 void TracingServiceImpl::ProducerEndpointImpl::Sync(
4965     std::function<void()> callback) {
4966   weak_runner_.task_runner()->PostTask(callback);
4967 }
4968 
IsAndroidProcessFrozen()4969 bool TracingServiceImpl::ProducerEndpointImpl::IsAndroidProcessFrozen() {
4970 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
4971   if (in_process_ || uid() == base::kInvalidUid || pid() == base::kInvalidPid)
4972     return false;
4973   base::StackString<255> path(
4974       "/sys/fs/cgroup/uid_%" PRIu32 "/pid_%" PRIu32 "/cgroup.freeze",
4975       static_cast<uint32_t>(uid()), static_cast<uint32_t>(pid()));
4976   char frozen = '0';
4977   auto fd = base::OpenFile(path.c_str(), O_RDONLY);
4978   ssize_t rsize = 0;
4979   if (fd) {
4980     rsize = base::Read(*fd, &frozen, sizeof(frozen));
4981     if (rsize > 0) {
4982       return frozen == '1';
4983     }
4984   }
4985   PERFETTO_DLOG("Failed to read %s (fd=%d, rsize=%d)", path.c_str(), !!fd,
4986                 static_cast<int>(rsize));
4987 #endif
4988   return false;
4989 }
4990 
4991 ////////////////////////////////////////////////////////////////////////////////
4992 // TracingServiceImpl::TracingSession implementation
4993 ////////////////////////////////////////////////////////////////////////////////
4994 
TracingSession(TracingSessionID session_id,ConsumerEndpointImpl * consumer,const TraceConfig & new_config,base::TaskRunner * task_runner)4995 TracingServiceImpl::TracingSession::TracingSession(
4996     TracingSessionID session_id,
4997     ConsumerEndpointImpl* consumer,
4998     const TraceConfig& new_config,
4999     base::TaskRunner* task_runner)
5000     : id(session_id),
5001       consumer_maybe_null(consumer),
5002       consumer_uid(consumer->uid_),
5003       config(new_config),
5004       snapshot_periodic_task(task_runner),
5005       timed_stop_task(task_runner) {
5006   // all_data_sources_flushed (and flush_started) is special because we store up
5007   // to 64 events of this type. Other events will go through the default case in
5008   // SnapshotLifecycleEvent() where they will be given a max history of 1.
5009   lifecycle_events.emplace_back(
5010       protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
5011       64 /* max_size */);
5012   lifecycle_events.emplace_back(
5013       protos::pbzero::TracingServiceEvent::kFlushStartedFieldNumber,
5014       64 /* max_size */);
5015 }
5016 
5017 ////////////////////////////////////////////////////////////////////////////////
5018 // TracingServiceImpl::RelayEndpointImpl implementation
5019 ////////////////////////////////////////////////////////////////////////////////
RelayEndpointImpl(RelayClientID relay_client_id,TracingServiceImpl * service)5020 TracingServiceImpl::RelayEndpointImpl::RelayEndpointImpl(
5021     RelayClientID relay_client_id,
5022     TracingServiceImpl* service)
5023     : relay_client_id_(relay_client_id), service_(service) {}
5024 TracingServiceImpl::RelayEndpointImpl::~RelayEndpointImpl() = default;
5025 
SyncClocks(SyncMode sync_mode,base::ClockSnapshotVector client_clocks,base::ClockSnapshotVector host_clocks)5026 void TracingServiceImpl::RelayEndpointImpl::SyncClocks(
5027     SyncMode sync_mode,
5028     base::ClockSnapshotVector client_clocks,
5029     base::ClockSnapshotVector host_clocks) {
5030   // We keep only the most recent 5 clock sync snapshots.
5031   static constexpr size_t kNumSyncClocks = 5;
5032   if (synced_clocks_.size() >= kNumSyncClocks)
5033     synced_clocks_.pop_front();
5034 
5035   synced_clocks_.emplace_back(sync_mode, std::move(client_clocks),
5036                               std::move(host_clocks));
5037 }
5038 
Disconnect()5039 void TracingServiceImpl::RelayEndpointImpl::Disconnect() {
5040   service_->DisconnectRelayClient(relay_client_id_);
5041 }
5042 
5043 }  // namespace perfetto
5044