/* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "src/tracing/internal/tracing_muxer_impl.h" #include #include #include #include #include #include "perfetto/base/build_config.h" #include "perfetto/base/logging.h" #include "perfetto/base/task_runner.h" #include "perfetto/base/time.h" #include "perfetto/ext/base/hash.h" #include "perfetto/ext/base/thread_checker.h" #include "perfetto/ext/base/waitable_event.h" #include "perfetto/ext/tracing/core/shared_memory_arbiter.h" #include "perfetto/ext/tracing/core/trace_packet.h" #include "perfetto/ext/tracing/core/trace_stats.h" #include "perfetto/ext/tracing/core/trace_writer.h" #include "perfetto/ext/tracing/core/tracing_service.h" #include "perfetto/tracing/buffer_exhausted_policy.h" #include "perfetto/tracing/core/data_source_config.h" #include "perfetto/tracing/core/tracing_service_state.h" #include "perfetto/tracing/data_source.h" #include "perfetto/tracing/internal/data_source_internal.h" #include "perfetto/tracing/internal/interceptor_trace_writer.h" #include "perfetto/tracing/internal/tracing_backend_fake.h" #include "perfetto/tracing/trace_writer_base.h" #include "perfetto/tracing/tracing.h" #include "perfetto/tracing/tracing_backend.h" #include "src/tracing/core/null_trace_writer.h" #include "src/tracing/internal/tracing_muxer_fake.h" #include "protos/perfetto/config/interceptor_config.gen.h" #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) #include // For dup() #else #include // For dup() #endif namespace perfetto { namespace internal { namespace { using RegisteredDataSource = TracingMuxerImpl::RegisteredDataSource; // A task runner which prevents calls to DataSource::Trace() while an operation // is in progress. Used to guard against unexpected re-entrancy where the // user-provided task runner implementation tries to enter a trace point under // the hood. class NonReentrantTaskRunner : public base::TaskRunner { public: NonReentrantTaskRunner(TracingMuxer* muxer, std::unique_ptr task_runner) : muxer_(muxer), task_runner_(std::move(task_runner)) {} // base::TaskRunner implementation. void PostTask(std::function task) override { CallWithGuard([&] { task_runner_->PostTask(std::move(task)); }); } void PostDelayedTask(std::function task, uint32_t delay_ms) override { CallWithGuard( [&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); }); } void AddFileDescriptorWatch(base::PlatformHandle fd, std::function callback) override { CallWithGuard( [&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); }); } void RemoveFileDescriptorWatch(base::PlatformHandle fd) override { CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); }); } bool RunsTasksOnCurrentThread() const override { bool result; CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); }); return result; } private: template void CallWithGuard(T lambda) const { auto* root_tls = muxer_->GetOrCreateTracingTLS(); if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) { lambda(); return; } ScopedReentrancyAnnotator scoped_annotator(*root_tls); lambda(); } TracingMuxer* const muxer_; std::unique_ptr task_runner_; }; class StopArgsImpl : public DataSourceBase::StopArgs { public: std::function HandleStopAsynchronously() const override { auto closure = std::move(async_stop_closure); async_stop_closure = std::function(); return closure; } mutable std::function async_stop_closure; }; class FlushArgsImpl : public DataSourceBase::FlushArgs { public: std::function HandleFlushAsynchronously() const override { auto closure = std::move(async_flush_closure); async_flush_closure = std::function(); return closure; } mutable std::function async_flush_closure; }; // Holds an earlier TracingMuxerImpl instance after ResetForTesting() is called. static TracingMuxerImpl* g_prev_instance{}; template struct CompareBackendByType { static int BackendTypePriority(BackendType type) { switch (type) { case kSystemBackend: return 0; case kInProcessBackend: return 1; case kCustomBackend: return 2; // The UnspecifiedBackend has the highest priority so that // TracingBackendFake is the last one on the backend lists. case kUnspecifiedBackend: break; } return 3; } bool operator()(BackendType type, const RegisteredBackend& b) { return BackendTypePriority(type) < BackendTypePriority(b.type); } }; } // namespace // ----- Begin of TracingMuxerImpl::ProducerImpl TracingMuxerImpl::ProducerImpl::ProducerImpl( TracingMuxerImpl* muxer, TracingBackendId backend_id, uint32_t shmem_batch_commits_duration_ms, bool shmem_direct_patching_enabled) : muxer_(muxer), backend_id_(backend_id), shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms), shmem_direct_patching_enabled_(shmem_direct_patching_enabled) {} TracingMuxerImpl::ProducerImpl::~ProducerImpl() { muxer_ = nullptr; } void TracingMuxerImpl::ProducerImpl::Initialize( std::unique_ptr endpoint) { PERFETTO_DCHECK_THREAD(thread_checker_); PERFETTO_DCHECK(!connected_); connection_id_.fetch_add(1, std::memory_order_relaxed); is_producer_provided_smb_ = endpoint->shared_memory(); last_startup_target_buffer_reservation_ = 0; // Adopt the endpoint into a shared pointer so that we can safely share it // across threads that create trace writers. The custom deleter function // ensures that the endpoint is always destroyed on the muxer's thread. (Note // that |task_runner| is assumed to outlive tracing sessions on all threads.) auto* task_runner = muxer_->task_runner_.get(); auto deleter = [task_runner](ProducerEndpoint* e) { if (task_runner->RunsTasksOnCurrentThread()) { delete e; return; } task_runner->PostTask([e] { delete e; }); }; std::shared_ptr service(endpoint.release(), deleter); // This atomic store is needed because another thread might be concurrently // creating a trace writer using the previous (disconnected) |service_|. See // CreateTraceWriter(). std::atomic_store(&service_, std::move(service)); // Don't try to use the service here since it may not have connected yet. See // OnConnect(). } void TracingMuxerImpl::ProducerImpl::OnConnect() { PERFETTO_DLOG("Producer connected"); PERFETTO_DCHECK_THREAD(thread_checker_); PERFETTO_DCHECK(!connected_); if (is_producer_provided_smb_ && !service_->IsShmemProvidedByProducer()) { PERFETTO_ELOG( "The service likely doesn't support producer-provided SMBs. Preventing " "future attempts to use producer-provided SMB again with this " "backend."); producer_provided_smb_failed_ = true; // Will call OnDisconnect() and cause a reconnect without producer-provided // SMB. service_->Disconnect(); return; } connected_ = true; muxer_->UpdateDataSourcesOnAllBackends(); SendOnConnectTriggers(); } void TracingMuxerImpl::ProducerImpl::OnDisconnect() { PERFETTO_DCHECK_THREAD(thread_checker_); // If we're being destroyed, bail out. if (!muxer_) return; connected_ = false; // Active data sources for this producer will be stopped by // DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer // will have a different connection id (even before it has finished // connecting). registered_data_sources_.reset(); DisposeConnection(); // Try reconnecting the producer. muxer_->OnProducerDisconnected(this); } void TracingMuxerImpl::ProducerImpl::DisposeConnection() { // Keep the old service around as a dead connection in case it has active // trace writers. If any tracing sessions were created, we can't clear // |service_| here because other threads may be concurrently creating new // trace writers. Any reconnection attempt will atomically swap the new // service in place of the old one. if (did_setup_tracing_ || did_setup_startup_tracing_) { dead_services_.push_back(service_); } else { service_.reset(); } } void TracingMuxerImpl::ProducerImpl::OnTracingSetup() { PERFETTO_DCHECK_THREAD(thread_checker_); did_setup_tracing_ = true; service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration( shmem_batch_commits_duration_ms_); if (shmem_direct_patching_enabled_) { service_->MaybeSharedMemoryArbiter()->EnableDirectSMBPatching(); } } void TracingMuxerImpl::ProducerImpl::OnStartupTracingSetup() { PERFETTO_DCHECK_THREAD(thread_checker_); did_setup_startup_tracing_ = true; } void TracingMuxerImpl::ProducerImpl::SetupDataSource( DataSourceInstanceID id, const DataSourceConfig& cfg) { PERFETTO_DCHECK_THREAD(thread_checker_); if (!muxer_) return; muxer_->SetupDataSource( backend_id_, connection_id_.load(std::memory_order_relaxed), id, cfg); } void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id, const DataSourceConfig&) { PERFETTO_DCHECK_THREAD(thread_checker_); if (!muxer_) return; muxer_->StartDataSource(backend_id_, id); service_->NotifyDataSourceStarted(id); } void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) { PERFETTO_DCHECK_THREAD(thread_checker_); if (!muxer_) return; muxer_->StopDataSource_AsyncBegin(backend_id_, id); } void TracingMuxerImpl::ProducerImpl::Flush( FlushRequestID flush_id, const DataSourceInstanceID* instances, size_t instance_count, FlushFlags flush_flags) { PERFETTO_DCHECK_THREAD(thread_checker_); bool all_handled = true; if (muxer_) { for (size_t i = 0; i < instance_count; i++) { DataSourceInstanceID ds_id = instances[i]; bool handled = muxer_->FlushDataSource_AsyncBegin(backend_id_, ds_id, flush_id, flush_flags); if (!handled) { pending_flushes_[flush_id].insert(ds_id); all_handled = false; } } } if (all_handled) { service_->NotifyFlushComplete(flush_id); } } void TracingMuxerImpl::ProducerImpl::ClearIncrementalState( const DataSourceInstanceID* instances, size_t instance_count) { PERFETTO_DCHECK_THREAD(thread_checker_); if (!muxer_) return; for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) { muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]); } } bool TracingMuxerImpl::ProducerImpl::SweepDeadServices() { PERFETTO_DCHECK_THREAD(thread_checker_); auto is_unused = [](const std::shared_ptr& endpoint) { auto* arbiter = endpoint->MaybeSharedMemoryArbiter(); return !arbiter || arbiter->TryShutdown(); }; for (auto it = dead_services_.begin(); it != dead_services_.end();) { auto next_it = it; next_it++; if (is_unused(*it)) { dead_services_.erase(it); } it = next_it; } return dead_services_.empty(); } void TracingMuxerImpl::ProducerImpl::SendOnConnectTriggers() { PERFETTO_DCHECK_THREAD(thread_checker_); base::TimeMillis now = base::GetWallTimeMs(); std::vector triggers; while (!on_connect_triggers_.empty()) { // Skip if we passed TTL. if (on_connect_triggers_.front().second > now) { triggers.push_back(std::move(on_connect_triggers_.front().first)); } on_connect_triggers_.pop_front(); } if (!triggers.empty()) { service_->ActivateTriggers(triggers); } } void TracingMuxerImpl::ProducerImpl::NotifyFlushForDataSourceDone( DataSourceInstanceID ds_id, FlushRequestID flush_id) { if (!connected_) { return; } { auto it = pending_flushes_.find(flush_id); if (it == pending_flushes_.end()) { return; } std::set& ds_ids = it->second; ds_ids.erase(ds_id); } std::optional biggest_flush_id; for (auto it = pending_flushes_.begin(); it != pending_flushes_.end();) { if (it->second.empty()) { biggest_flush_id = it->first; it = pending_flushes_.erase(it); } else { break; } } if (biggest_flush_id) { service_->NotifyFlushComplete(*biggest_flush_id); } } // ----- End of TracingMuxerImpl::ProducerImpl methods. // ----- Begin of TracingMuxerImpl::ConsumerImpl TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer, BackendType backend_type, TracingSessionGlobalID session_id) : muxer_(muxer), backend_type_(backend_type), session_id_(session_id) {} TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() { muxer_ = nullptr; } void TracingMuxerImpl::ConsumerImpl::Initialize( std::unique_ptr endpoint) { PERFETTO_DCHECK_THREAD(thread_checker_); service_ = std::move(endpoint); // Don't try to use the service here since it may not have connected yet. See // OnConnect(). } void TracingMuxerImpl::ConsumerImpl::OnConnect() { PERFETTO_DCHECK_THREAD(thread_checker_); PERFETTO_DCHECK(!connected_); connected_ = true; // Observe data source instance events so we get notified when tracing starts. service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES | ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED); // If the API client configured and started tracing before we connected, // tell the backend about it now. if (trace_config_) muxer_->SetupTracingSession(session_id_, trace_config_); if (start_pending_) muxer_->StartTracingSession(session_id_); if (get_trace_stats_pending_) { auto callback = std::move(get_trace_stats_callback_); get_trace_stats_callback_ = nullptr; muxer_->GetTraceStats(session_id_, std::move(callback)); } if (query_service_state_callback_) { auto callback = std::move(query_service_state_callback_); query_service_state_callback_ = nullptr; muxer_->QueryServiceState(session_id_, std::move(callback)); } if (session_to_clone_) { service_->CloneSession(*session_to_clone_); session_to_clone_ = std::nullopt; } if (stop_pending_) muxer_->StopTracingSession(session_id_); } void TracingMuxerImpl::ConsumerImpl::OnDisconnect() { PERFETTO_DCHECK_THREAD(thread_checker_); // If we're being destroyed, bail out. if (!muxer_) return; #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) if (!connected_ && backend_type_ == kSystemBackend) { PERFETTO_ELOG( "Unable to connect to the system tracing service as a consumer. On " "Android, use the \"perfetto\" command line tool instead to start " "system-wide tracing sessions"); } #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) // Notify the client about disconnection. NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"}); // Make sure the client doesn't hang in a blocking start/stop because of the // disconnection. NotifyStartComplete(); NotifyStopComplete(); // It shouldn't be necessary to call StopTracingSession. If we get this call // it means that the service did shutdown before us, so there is no point // trying it to ask it to stop the session. We should just remember to cleanup // the consumer vector. connected_ = false; // Notify the muxer that it is safe to destroy |this|. This is needed because // the ConsumerEndpoint stored in |service_| requires that |this| be safe to // access until OnDisconnect() is called. muxer_->OnConsumerDisconnected(this); } void TracingMuxerImpl::ConsumerImpl::Disconnect() { // This is weird and deserves a comment. // // When we called the ConnectConsumer method on the service it returns // us a ConsumerEndpoint which we stored in |service_|, however this // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by // |this|. Part of the API contract to TracingService::ConnectConsumer is that // the ConsumerImpl pointer has to be valid until the // ConsumerImpl::OnDisconnect method is called. Therefore we reset the // ConsumerEndpoint |service_|. Eventually this will call // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to // call the destructor of |this|. service_.reset(); } void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled( const std::string& error) { PERFETTO_DCHECK_THREAD(thread_checker_); PERFETTO_DCHECK(!stopped_); stopped_ = true; if (!error.empty()) NotifyError(TracingError{TracingError::kTracingFailed, error}); // If we're still waiting for the start event, fire it now. This may happen if // there are no active data sources in the session. NotifyStartComplete(); NotifyStopComplete(); } void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() { PERFETTO_DCHECK_THREAD(thread_checker_); if (start_complete_callback_) { muxer_->task_runner_->PostTask(std::move(start_complete_callback_)); start_complete_callback_ = nullptr; } if (blocking_start_complete_callback_) { muxer_->task_runner_->PostTask( std::move(blocking_start_complete_callback_)); blocking_start_complete_callback_ = nullptr; } } void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) { PERFETTO_DCHECK_THREAD(thread_checker_); if (error_callback_) { muxer_->task_runner_->PostTask( std::bind(std::move(error_callback_), error)); } } void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() { PERFETTO_DCHECK_THREAD(thread_checker_); if (stop_complete_callback_) { muxer_->task_runner_->PostTask(std::move(stop_complete_callback_)); stop_complete_callback_ = nullptr; } if (blocking_stop_complete_callback_) { muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_)); blocking_stop_complete_callback_ = nullptr; } } void TracingMuxerImpl::ConsumerImpl::OnTraceData( std::vector packets, bool has_more) { PERFETTO_DCHECK_THREAD(thread_checker_); if (!read_trace_callback_) return; size_t capacity = 0; for (const auto& packet : packets) { // 16 is an over-estimation of the proto preamble size capacity += packet.size() + 16; } // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing. std::shared_ptr> buf(new std::vector()); buf->reserve(capacity); for (auto& packet : packets) { char* start; size_t size; std::tie(start, size) = packet.GetProtoPreamble(); buf->insert(buf->end(), start, start + size); for (auto& slice : packet.slices()) { const auto* slice_data = reinterpret_cast(slice.start); buf->insert(buf->end(), slice_data, slice_data + slice.size); } } auto callback = read_trace_callback_; muxer_->task_runner_->PostTask([callback, buf, has_more] { TracingSession::ReadTraceCallbackArgs callback_arg{}; callback_arg.data = buf->empty() ? nullptr : &(*buf)[0]; callback_arg.size = buf->size(); callback_arg.has_more = has_more; callback(callback_arg); }); if (!has_more) read_trace_callback_ = nullptr; } void TracingMuxerImpl::ConsumerImpl::OnObservableEvents( const ObservableEvents& events) { if (events.instance_state_changes_size()) { for (const auto& state_change : events.instance_state_changes()) { DataSourceHandle handle{state_change.producer_name(), state_change.data_source_name()}; data_source_states_[handle] = state_change.state() == ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED; } } if (events.instance_state_changes_size() || events.all_data_sources_started()) { // Data sources are first reported as being stopped before starting, so once // all the data sources we know about have started we can declare tracing // begun. In the case where there are no matching data sources for the // session, the service will report the all_data_sources_started() event // without adding any instances (only since Android S / Perfetto v10.0). if (start_complete_callback_ || blocking_start_complete_callback_) { bool all_data_sources_started = std::all_of( data_source_states_.cbegin(), data_source_states_.cend(), [](std::pair state) { return state.second; }); if (all_data_sources_started) NotifyStartComplete(); } } } void TracingMuxerImpl::ConsumerImpl::OnSessionCloned( const OnSessionClonedArgs& args) { if (!clone_trace_callback_) return; TracingSession::CloneTraceCallbackArgs callback_arg{}; callback_arg.success = args.success; callback_arg.error = std::move(args.error); callback_arg.uuid_msb = args.uuid.msb(); callback_arg.uuid_lsb = args.uuid.lsb(); muxer_->task_runner_->PostTask( std::bind(std::move(clone_trace_callback_), std::move(callback_arg))); clone_trace_callback_ = nullptr; } void TracingMuxerImpl::ConsumerImpl::OnTraceStats( bool success, const TraceStats& trace_stats) { if (!get_trace_stats_callback_) return; TracingSession::GetTraceStatsCallbackArgs callback_arg{}; callback_arg.success = success; callback_arg.trace_stats_data = trace_stats.SerializeAsArray(); muxer_->task_runner_->PostTask( std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg))); get_trace_stats_callback_ = nullptr; } // The callbacks below are not used. void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {} void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {} // ----- End of TracingMuxerImpl::ConsumerImpl // ----- Begin of TracingMuxerImpl::TracingSessionImpl // TracingSessionImpl is the RAII object returned to API clients when they // invoke Tracing::CreateTracingSession. They use it for starting/stopping // tracing. TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl( TracingMuxerImpl* muxer, TracingSessionGlobalID session_id, BackendType backend_type) : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {} // Can be destroyed from any thread. TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() { auto* muxer = muxer_; auto session_id = session_id_; muxer->task_runner_->PostTask( [muxer, session_id] { muxer->DestroyTracingSession(session_id); }); } // Can be called from any thread. void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg, int fd) { auto* muxer = muxer_; auto session_id = session_id_; std::shared_ptr trace_config(new TraceConfig(cfg)); if (fd >= 0) { base::ignore_result(backend_type_); // For -Wunused in the amalgamation. #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) if (backend_type_ != kInProcessBackend) { PERFETTO_FATAL( "Passing a file descriptor to TracingSession::Setup() is only " "supported with the kInProcessBackend on Windows. Use " "TracingSession::ReadTrace() instead"); } #endif trace_config->set_write_into_file(true); fd = dup(fd); } muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] { muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd)); }); } // Can be called from any thread. void TracingMuxerImpl::TracingSessionImpl::Start() { auto* muxer = muxer_; auto session_id = session_id_; muxer->task_runner_->PostTask( [muxer, session_id] { muxer->StartTracingSession(session_id); }); } void TracingMuxerImpl::TracingSessionImpl::CloneTrace(CloneTraceArgs args, CloneTraceCallback cb) { auto* muxer = muxer_; auto session_id = session_id_; muxer->task_runner_->PostTask([muxer, session_id, args, cb] { muxer->CloneTracingSession(session_id, args, std::move(cb)); }); } // Can be called from any thread. void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig( const TraceConfig& cfg) { auto* muxer = muxer_; auto session_id = session_id_; muxer->task_runner_->PostTask([muxer, session_id, cfg] { muxer->ChangeTracingSessionConfig(session_id, cfg); }); } // Can be called from any thread except the service thread. void TracingMuxerImpl::TracingSessionImpl::StartBlocking() { PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread()); auto* muxer = muxer_; auto session_id = session_id_; base::WaitableEvent tracing_started; muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] { auto* consumer = muxer->FindConsumer(session_id); if (!consumer) { // TODO(skyostil): Signal an error to the user. tracing_started.Notify(); return; } PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_); consumer->blocking_start_complete_callback_ = [&] { tracing_started.Notify(); }; muxer->StartTracingSession(session_id); }); tracing_started.Wait(); } // Can be called from any thread. void TracingMuxerImpl::TracingSessionImpl::Flush( std::function user_callback, uint32_t timeout_ms) { auto* muxer = muxer_; auto session_id = session_id_; muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] { auto* consumer = muxer->FindConsumer(session_id); if (!consumer) { std::move(user_callback)(false); return; } muxer->FlushTracingSession(session_id, timeout_ms, std::move(user_callback)); }); } // Can be called from any thread. void TracingMuxerImpl::TracingSessionImpl::Stop() { auto* muxer = muxer_; auto session_id = session_id_; muxer->task_runner_->PostTask( [muxer, session_id] { muxer->StopTracingSession(session_id); }); } // Can be called from any thread except the service thread. void TracingMuxerImpl::TracingSessionImpl::StopBlocking() { PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread()); auto* muxer = muxer_; auto session_id = session_id_; base::WaitableEvent tracing_stopped; muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] { auto* consumer = muxer->FindConsumer(session_id); if (!consumer) { // TODO(skyostil): Signal an error to the user. tracing_stopped.Notify(); return; } PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_); consumer->blocking_stop_complete_callback_ = [&] { tracing_stopped.Notify(); }; muxer->StopTracingSession(session_id); }); tracing_stopped.Wait(); } // Can be called from any thread. void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) { auto* muxer = muxer_; auto session_id = session_id_; muxer->task_runner_->PostTask([muxer, session_id, cb] { muxer->ReadTracingSessionData(session_id, std::move(cb)); }); } // Can be called from any thread. void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback( std::function cb) { auto* muxer = muxer_; auto session_id = session_id_; muxer->task_runner_->PostTask([muxer, session_id, cb] { auto* consumer = muxer->FindConsumer(session_id); if (!consumer) return; consumer->start_complete_callback_ = cb; }); } // Can be called from any thread void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback( std::function cb) { auto* muxer = muxer_; auto session_id = session_id_; muxer->task_runner_->PostTask([muxer, session_id, cb] { auto* consumer = muxer->FindConsumer(session_id); if (!consumer) { // Notify the client about concurrent disconnection of the session. if (cb) cb(TracingError{TracingError::kDisconnected, "Peer disconnected"}); return; } consumer->error_callback_ = cb; }); } // Can be called from any thread. void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback( std::function cb) { auto* muxer = muxer_; auto session_id = session_id_; muxer->task_runner_->PostTask([muxer, session_id, cb] { auto* consumer = muxer->FindConsumer(session_id); if (!consumer) return; consumer->stop_complete_callback_ = cb; }); } // Can be called from any thread. void TracingMuxerImpl::TracingSessionImpl::GetTraceStats( GetTraceStatsCallback cb) { auto* muxer = muxer_; auto session_id = session_id_; muxer->task_runner_->PostTask([muxer, session_id, cb] { muxer->GetTraceStats(session_id, std::move(cb)); }); } // Can be called from any thread. void TracingMuxerImpl::TracingSessionImpl::QueryServiceState( QueryServiceStateCallback cb) { auto* muxer = muxer_; auto session_id = session_id_; muxer->task_runner_->PostTask([muxer, session_id, cb] { muxer->QueryServiceState(session_id, std::move(cb)); }); } // ----- End of TracingMuxerImpl::TracingSessionImpl // ----- Begin of TracingMuxerImpl::StartupTracingSessionImpl TracingMuxerImpl::StartupTracingSessionImpl::StartupTracingSessionImpl( TracingMuxerImpl* muxer, TracingSessionGlobalID session_id, BackendType backend_type) : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {} // Can be destroyed from any thread. TracingMuxerImpl::StartupTracingSessionImpl::~StartupTracingSessionImpl() = default; void TracingMuxerImpl::StartupTracingSessionImpl::Abort() { auto* muxer = muxer_; auto session_id = session_id_; auto backend_type = backend_type_; muxer->task_runner_->PostTask([muxer, session_id, backend_type] { muxer->AbortStartupTracingSession(session_id, backend_type); }); } // Must not be called from the SDK's internal thread. void TracingMuxerImpl::StartupTracingSessionImpl::AbortBlocking() { auto* muxer = muxer_; auto session_id = session_id_; auto backend_type = backend_type_; PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread()); base::WaitableEvent event; muxer->task_runner_->PostTask([muxer, session_id, backend_type, &event] { muxer->AbortStartupTracingSession(session_id, backend_type); event.Notify(); }); event.Wait(); } // ----- End of TracingMuxerImpl::StartupTracingSessionImpl // static TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get(); // This is called by perfetto::Tracing::Initialize(). // Can be called on any thread. Typically, but not necessarily, that will be // the embedder's main thread. TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args) : TracingMuxer(args.platform ? args.platform : Platform::GetDefaultPlatform()) { PERFETTO_DETACH_FROM_THREAD(thread_checker_); instance_ = this; // Create the thread where muxer, producers and service will live. Platform::CreateTaskRunnerArgs tr_args{/*name_for_debugging=*/"TracingMuxer"}; task_runner_.reset(new NonReentrantTaskRunner( this, platform_->CreateTaskRunner(std::move(tr_args)))); // Run the initializer on that thread. task_runner_->PostTask([this, args] { Initialize(args); AddBackends(args); }); } void TracingMuxerImpl::Initialize(const TracingInitArgs& args) { PERFETTO_DCHECK_THREAD(thread_checker_); // Rebind the thread checker. policy_ = args.tracing_policy; supports_multiple_data_source_instances_ = args.supports_multiple_data_source_instances; // Fallback backend for producer creation for an unsupported backend type. PERFETTO_CHECK(producer_backends_.empty()); AddProducerBackend(internal::TracingBackendFake::GetInstance(), BackendType::kUnspecifiedBackend, args); // Fallback backend for consumer creation for an unsupported backend type. // This backend simply fails any attempt to start a tracing session. PERFETTO_CHECK(consumer_backends_.empty()); AddConsumerBackend(internal::TracingBackendFake::GetInstance(), BackendType::kUnspecifiedBackend); } void TracingMuxerImpl::AddConsumerBackend(TracingConsumerBackend* backend, BackendType type) { if (!backend) { // We skip the log in release builds because the *_backend_fake.cc code // has already an ELOG before returning a nullptr. PERFETTO_DLOG("Consumer backend creation failed, type %d", static_cast(type)); return; } // Keep the backends sorted by type. auto it = std::upper_bound(consumer_backends_.begin(), consumer_backends_.end(), type, CompareBackendByType()); it = consumer_backends_.emplace(it); RegisteredConsumerBackend& rb = *it; rb.backend = backend; rb.type = type; } void TracingMuxerImpl::AddProducerBackend(TracingProducerBackend* backend, BackendType type, const TracingInitArgs& args) { if (!backend) { // We skip the log in release builds because the *_backend_fake.cc code // has already an ELOG before returning a nullptr. PERFETTO_DLOG("Producer backend creation failed, type %d", static_cast(type)); return; } TracingBackendId backend_id = producer_backends_.size(); // Keep the backends sorted by type. auto it = std::upper_bound(producer_backends_.begin(), producer_backends_.end(), type, CompareBackendByType()); it = producer_backends_.emplace(it); RegisteredProducerBackend& rb = *it; rb.backend = backend; rb.id = backend_id; rb.type = type; rb.producer.reset(new ProducerImpl(this, backend_id, args.shmem_batch_commits_duration_ms, args.shmem_direct_patching_enabled)); rb.producer_conn_args.producer = rb.producer.get(); rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName(); rb.producer_conn_args.task_runner = task_runner_.get(); rb.producer_conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024; rb.producer_conn_args.shmem_page_size_hint_bytes = args.shmem_page_size_hint_kb * 1024; rb.producer_conn_args.create_socket_async = args.create_socket_async; rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args)); } TracingMuxerImpl::RegisteredProducerBackend* TracingMuxerImpl::FindProducerBackendById(TracingBackendId id) { for (RegisteredProducerBackend& b : producer_backends_) { if (b.id == id) { return &b; } } return nullptr; } TracingMuxerImpl::RegisteredProducerBackend* TracingMuxerImpl::FindProducerBackendByType(BackendType type) { for (RegisteredProducerBackend& b : producer_backends_) { if (b.type == type) { return &b; } } return nullptr; } TracingMuxerImpl::RegisteredConsumerBackend* TracingMuxerImpl::FindConsumerBackendByType(BackendType type) { for (RegisteredConsumerBackend& b : consumer_backends_) { if (b.type == type) { return &b; } } return nullptr; } void TracingMuxerImpl::AddBackends(const TracingInitArgs& args) { if (args.backends & kSystemBackend) { PERFETTO_CHECK(args.system_producer_backend_factory_); if (FindProducerBackendByType(kSystemBackend) == nullptr) { AddProducerBackend(args.system_producer_backend_factory_(), kSystemBackend, args); } if (args.enable_system_consumer) { PERFETTO_CHECK(args.system_consumer_backend_factory_); if (FindConsumerBackendByType(kSystemBackend) == nullptr) { AddConsumerBackend(args.system_consumer_backend_factory_(), kSystemBackend); } } } if (args.backends & kInProcessBackend) { TracingBackend* b = nullptr; if (FindProducerBackendByType(kInProcessBackend) == nullptr) { if (!b) { PERFETTO_CHECK(args.in_process_backend_factory_); b = args.in_process_backend_factory_(); } AddProducerBackend(b, kInProcessBackend, args); } if (FindConsumerBackendByType(kInProcessBackend) == nullptr) { if (!b) { PERFETTO_CHECK(args.in_process_backend_factory_); b = args.in_process_backend_factory_(); } AddConsumerBackend(b, kInProcessBackend); } } if (args.backends & kCustomBackend) { PERFETTO_CHECK(args.custom_backend); if (FindProducerBackendByType(kCustomBackend) == nullptr) { AddProducerBackend(args.custom_backend, kCustomBackend, args); } if (FindConsumerBackendByType(kCustomBackend) == nullptr) { AddConsumerBackend(args.custom_backend, kCustomBackend); } } if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) { PERFETTO_FATAL("Unsupported tracing backend type"); } } // Can be called from any thread (but not concurrently). bool TracingMuxerImpl::RegisterDataSource( const DataSourceDescriptor& descriptor, DataSourceFactory factory, DataSourceParams params, bool no_flush, DataSourceStaticState* static_state) { // Ignore repeated registrations. if (static_state->index != kMaxDataSources) return true; uint32_t new_index = next_data_source_index_++; if (new_index >= kMaxDataSources) { PERFETTO_DLOG( "RegisterDataSource failed: too many data sources already registered"); return false; } // Initialize the static state. static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState), "instances[] size mismatch"); for (size_t i = 0; i < static_state->instances.size(); i++) new (&static_state->instances[i]) DataSourceState{}; static_state->index = new_index; // Generate a semi-unique id for this data source. base::Hasher hash; hash.Update(reinterpret_cast(static_state)); hash.Update(base::GetWallTimeNs().count()); static_state->id = hash.digest() ? hash.digest() : 1; task_runner_->PostTask([this, descriptor, factory, static_state, params, no_flush] { data_sources_.emplace_back(); RegisteredDataSource& rds = data_sources_.back(); rds.descriptor = descriptor; rds.factory = factory; rds.supports_multiple_instances = supports_multiple_data_source_instances_ && params.supports_multiple_instances; rds.requires_callbacks_under_lock = params.requires_callbacks_under_lock; rds.static_state = static_state; rds.no_flush = no_flush; UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false); }); return true; } // Can be called from any thread (but not concurrently). void TracingMuxerImpl::UpdateDataSourceDescriptor( const DataSourceDescriptor& descriptor, const DataSourceStaticState* static_state) { task_runner_->PostTask([this, descriptor, static_state] { for (auto& rds : data_sources_) { if (rds.static_state == static_state) { PERFETTO_CHECK(rds.descriptor.name() == descriptor.name()); rds.descriptor = descriptor; rds.descriptor.set_id(static_state->id); UpdateDataSourceOnAllBackends(rds, /*is_changed=*/true); return; } } }); } // Can be called from any thread (but not concurrently). void TracingMuxerImpl::RegisterInterceptor( const InterceptorDescriptor& descriptor, InterceptorFactory factory, InterceptorBase::TLSFactory tls_factory, InterceptorBase::TracePacketCallback packet_callback) { task_runner_->PostTask([this, descriptor, factory, tls_factory, packet_callback] { // Ignore repeated registrations. for (const auto& interceptor : interceptors_) { if (interceptor.descriptor.name() == descriptor.name()) { PERFETTO_DCHECK(interceptor.tls_factory == tls_factory); PERFETTO_DCHECK(interceptor.packet_callback == packet_callback); return; } } // Only allow certain interceptors for now. if (descriptor.name() != "test_interceptor" && descriptor.name() != "console" && descriptor.name() != "etwexport") { PERFETTO_ELOG( "Interceptors are experimental. If you want to use them, please " "get in touch with the project maintainers " "(https://perfetto.dev/docs/contributing/" "getting-started#community)."); return; } interceptors_.emplace_back(); RegisteredInterceptor& interceptor = interceptors_.back(); interceptor.descriptor = descriptor; interceptor.factory = factory; interceptor.tls_factory = tls_factory; interceptor.packet_callback = packet_callback; }); } void TracingMuxerImpl::ActivateTriggers( const std::vector& triggers, uint32_t ttl_ms) { base::TimeMillis expire_time = base::GetWallTimeMs() + base::TimeMillis(ttl_ms); task_runner_->PostTask([this, triggers, expire_time] { for (RegisteredProducerBackend& backend : producer_backends_) { if (backend.producer->connected_) { backend.producer->service_->ActivateTriggers(triggers); } else { for (const std::string& trigger : triggers) { backend.producer->on_connect_triggers_.emplace_back(trigger, expire_time); } } } }); } // Checks if there is any matching startup tracing data source instance for a // new SetupDataSource call. If so, moves the data source to this tracing // session (and its target buffer) and returns true, otherwise returns false. static bool MaybeAdoptStartupTracingInDataSource( TracingBackendId backend_id, uint32_t backend_connection_id, DataSourceInstanceID instance_id, const DataSourceConfig& cfg, const std::vector& data_sources) { for (const auto& rds : data_sources) { DataSourceStaticState* static_state = rds.static_state; for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) { auto* internal_state = static_state->TryGet(i); if (internal_state && internal_state->startup_target_buffer_reservation.load( std::memory_order_relaxed) && internal_state->data_source_instance_id == 0 && internal_state->backend_id == backend_id && internal_state->backend_connection_id == backend_connection_id && internal_state->config && internal_state->data_source->CanAdoptStartupSession( *internal_state->config, cfg)) { PERFETTO_DLOG("Setting up data source %" PRIu64 " %s by adopting it from a startup tracing session", instance_id, cfg.name().c_str()); std::lock_guard lock(internal_state->lock); // Set the associations. The actual takeover happens in // StartDataSource(). internal_state->data_source_instance_id = instance_id; internal_state->buffer_id = static_cast(cfg.target_buffer()); internal_state->config.reset(new DataSourceConfig(cfg)); // TODO(eseckler): Should the data souce config provided by the service // be allowed to specify additional interceptors / additional data // source params? return true; } } } return false; } // Called by the service of one of the backends. void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id, uint32_t backend_connection_id, DataSourceInstanceID instance_id, const DataSourceConfig& cfg) { PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id, cfg.name().c_str()); PERFETTO_DCHECK_THREAD(thread_checker_); // First check if there is any matching startup tracing data source instance. if (MaybeAdoptStartupTracingInDataSource(backend_id, backend_connection_id, instance_id, cfg, data_sources_)) { return; } for (const auto& rds : data_sources_) { if (rds.descriptor.name() != cfg.name()) continue; DataSourceStaticState& static_state = *rds.static_state; // If this data source is already active for this exact config, don't start // another instance. This happens when we have several data sources with the // same name, in which case the service sends one SetupDataSource event for // each one. Since we can't map which event maps to which data source, we // ensure each event only starts one data source instance. // TODO(skyostil): Register a unique id with each data source to the service // to disambiguate. bool active_for_config = false; for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) { if (!static_state.TryGet(i)) continue; auto* internal_state = reinterpret_cast(&static_state.instances[i]); if (internal_state->backend_id == backend_id && internal_state->backend_connection_id == backend_connection_id && internal_state->config && *internal_state->config == cfg) { active_for_config = true; break; } } if (active_for_config) { PERFETTO_DLOG( "Data source %s is already active with this config, skipping", cfg.name().c_str()); continue; } SetupDataSourceImpl(rds, backend_id, backend_connection_id, instance_id, cfg, /*startup_session_id=*/0); return; } } TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::SetupDataSourceImpl( const RegisteredDataSource& rds, TracingBackendId backend_id, uint32_t backend_connection_id, DataSourceInstanceID instance_id, const DataSourceConfig& cfg, TracingSessionGlobalID startup_session_id) { PERFETTO_DCHECK_THREAD(thread_checker_); DataSourceStaticState& static_state = *rds.static_state; // If any bit is set in `static_state.valid_instances` then at least one // other instance of data source is running. if (!rds.supports_multiple_instances && static_state.valid_instances.load(std::memory_order_acquire) != 0) { PERFETTO_ELOG( "Failed to setup data source because some another instance of this " "data source is already active"); return FindDataSourceRes(); } for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) { // Find a free slot. if (static_state.TryGet(i)) continue; auto* internal_state = reinterpret_cast(&static_state.instances[i]); std::unique_lock lock(internal_state->lock); static_assert( std::is_samedata_source_instance_id), DataSourceInstanceID>::value, "data_source_instance_id type mismatch"); internal_state->muxer_id_for_testing = muxer_id_for_testing_; RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id); if (startup_session_id) { uint16_t& last_reservation = backend.producer->last_startup_target_buffer_reservation_; if (last_reservation == std::numeric_limits::max()) { PERFETTO_ELOG( "Startup buffer reservations exhausted, dropping data source"); return FindDataSourceRes(); } internal_state->startup_target_buffer_reservation.store( ++last_reservation, std::memory_order_relaxed); } else { internal_state->startup_target_buffer_reservation.store( 0, std::memory_order_relaxed); } internal_state->backend_id = backend_id; internal_state->backend_connection_id = backend_connection_id; internal_state->data_source_instance_id = instance_id; internal_state->buffer_id = static_cast(cfg.target_buffer()); internal_state->config.reset(new DataSourceConfig(cfg)); internal_state->startup_session_id = startup_session_id; internal_state->data_source = rds.factory(); internal_state->interceptor = nullptr; internal_state->interceptor_id = 0; internal_state->will_notify_on_stop = rds.descriptor.will_notify_on_stop(); if (cfg.has_interceptor_config()) { for (size_t j = 0; j < interceptors_.size(); j++) { if (cfg.interceptor_config().name() == interceptors_[j].descriptor.name()) { PERFETTO_DLOG("Intercepting data source %" PRIu64 " \"%s\" into \"%s\"", instance_id, cfg.name().c_str(), cfg.interceptor_config().name().c_str()); internal_state->interceptor_id = static_cast(j + 1); internal_state->interceptor = interceptors_[j].factory(); internal_state->interceptor->OnSetup({cfg}); break; } } if (!internal_state->interceptor_id) { PERFETTO_ELOG("Unknown interceptor configured for data source: %s", cfg.interceptor_config().name().c_str()); } } // This must be made at the end. See matching acquire-load in // DataSource::Trace(). static_state.valid_instances.fetch_or(1 << i, std::memory_order_release); DataSourceBase::SetupArgs setup_args; setup_args.config = &cfg; setup_args.backend_type = backend.type; setup_args.internal_instance_index = i; if (!rds.requires_callbacks_under_lock) lock.unlock(); internal_state->data_source->OnSetup(setup_args); return FindDataSourceRes(&static_state, internal_state, i, rds.requires_callbacks_under_lock); } PERFETTO_ELOG( "Maximum number of data source instances exhausted. " "Dropping data source %" PRIu64, instance_id); return FindDataSourceRes(); } // Called by the service of one of the backends. void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id, DataSourceInstanceID instance_id) { PERFETTO_DLOG("Starting data source %" PRIu64, instance_id); PERFETTO_DCHECK_THREAD(thread_checker_); auto ds = FindDataSource(backend_id, instance_id); if (!ds) { PERFETTO_ELOG("Could not find data source to start"); return; } // Check if the data source was already started for startup tracing. uint16_t startup_reservation = ds.internal_state->startup_target_buffer_reservation.load( std::memory_order_relaxed); if (startup_reservation) { RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id); TracingSessionGlobalID session_id = ds.internal_state->startup_session_id; auto session_it = std::find_if( backend.startup_sessions.begin(), backend.startup_sessions.end(), [session_id](const RegisteredStartupSession& session) { return session.session_id == session_id; }); PERFETTO_DCHECK(session_it != backend.startup_sessions.end()); if (session_it->is_aborting) { PERFETTO_DLOG("Data source %" PRIu64 " was already aborted for startup tracing, not starting it", instance_id); return; } PERFETTO_DLOG( "Data source %" PRIu64 " was already started for startup tracing, binding its target buffer", instance_id); backend.producer->service_->MaybeSharedMemoryArbiter() ->BindStartupTargetBuffer(startup_reservation, ds.internal_state->buffer_id); // The reservation ID can be used even after binding it, so there's no need // for any barriers here - we just need atomicity. ds.internal_state->startup_target_buffer_reservation.store( 0, std::memory_order_relaxed); // TODO(eseckler): Should we reset incremental state at this point, or // notify the data source some other way? // The session should not have been fully bound yet (or aborted). PERFETTO_DCHECK(session_it->num_unbound_data_sources > 0); session_it->num_unbound_data_sources--; if (session_it->num_unbound_data_sources == 0) { if (session_it->on_adopted) task_runner_->PostTask(session_it->on_adopted); backend.startup_sessions.erase(session_it); } return; } StartDataSourceImpl(ds); } void TracingMuxerImpl::StartDataSourceImpl(const FindDataSourceRes& ds) { PERFETTO_DCHECK_THREAD(thread_checker_); DataSourceBase::StartArgs start_args{}; start_args.internal_instance_index = ds.instance_idx; std::unique_lock lock(ds.internal_state->lock); if (ds.internal_state->interceptor) ds.internal_state->interceptor->OnStart({}); ds.internal_state->trace_lambda_enabled.store(true, std::memory_order_relaxed); PERFETTO_DCHECK(ds.internal_state->data_source != nullptr); if (!ds.requires_callbacks_under_lock) lock.unlock(); ds.internal_state->data_source->OnStart(start_args); } // Called by the service of one of the backends. void TracingMuxerImpl::StopDataSource_AsyncBegin( TracingBackendId backend_id, DataSourceInstanceID instance_id) { PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id); PERFETTO_DCHECK_THREAD(thread_checker_); auto ds = FindDataSource(backend_id, instance_id); if (!ds) { PERFETTO_ELOG("Could not find data source to stop"); return; } StopDataSource_AsyncBeginImpl(ds); } void TracingMuxerImpl::StopDataSource_AsyncBeginImpl( const FindDataSourceRes& ds) { TracingBackendId backend_id = ds.internal_state->backend_id; uint32_t backend_connection_id = ds.internal_state->backend_connection_id; DataSourceInstanceID instance_id = ds.internal_state->data_source_instance_id; StopArgsImpl stop_args{}; stop_args.internal_instance_index = ds.instance_idx; stop_args.async_stop_closure = [this, backend_id, backend_connection_id, instance_id, ds] { // TracingMuxerImpl is long lived, capturing |this| is okay. // The notification closure can be moved out of the StopArgs by the // embedder to handle stop asynchronously. The embedder might then // call the closure on a different thread than the current one, hence // this nested PostTask(). task_runner_->PostTask( [this, backend_id, backend_connection_id, instance_id, ds] { StopDataSource_AsyncEnd(backend_id, backend_connection_id, instance_id, ds); }); }; { std::unique_lock lock(ds.internal_state->lock); // Don't call OnStop again if the datasource is already stopping. if (ds.internal_state->async_stop_in_progress) return; ds.internal_state->async_stop_in_progress = true; if (ds.internal_state->interceptor) ds.internal_state->interceptor->OnStop({}); if (!ds.requires_callbacks_under_lock) lock.unlock(); ds.internal_state->data_source->OnStop(stop_args); } // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the // async closure here. In theory we could avoid the PostTask and call // straight into CompleteDataSourceAsyncStop(). We keep that to reduce // divergencies between the deferred-stop vs non-deferred-stop code paths. if (stop_args.async_stop_closure) std::move(stop_args.async_stop_closure)(); } void TracingMuxerImpl::StopDataSource_AsyncEnd(TracingBackendId backend_id, uint32_t backend_connection_id, DataSourceInstanceID instance_id, const FindDataSourceRes& ds) { PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id); PERFETTO_DCHECK_THREAD(thread_checker_); // Check that the data source instance is still active and was not modified // while it was being stopped. if (!ds.static_state->TryGet(ds.instance_idx) || ds.internal_state->backend_id != backend_id || ds.internal_state->backend_connection_id != backend_connection_id || ds.internal_state->data_source_instance_id != instance_id) { PERFETTO_ELOG( "Async stop of data source %" PRIu64 " failed. This might be due to calling the async_stop_closure twice.", instance_id); return; } const uint32_t mask = ~(1 << ds.instance_idx); ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel); bool will_notify_on_stop; // Take the mutex to prevent that the data source is in the middle of // a Trace() execution where it called GetDataSourceLocked() while we // destroy it. uint16_t startup_buffer_reservation; TracingSessionGlobalID startup_session_id; { std::lock_guard guard(ds.internal_state->lock); ds.internal_state->trace_lambda_enabled.store(false, std::memory_order_relaxed); ds.internal_state->data_source.reset(); ds.internal_state->interceptor.reset(); ds.internal_state->config.reset(); ds.internal_state->async_stop_in_progress = false; will_notify_on_stop = ds.internal_state->will_notify_on_stop; startup_buffer_reservation = ds.internal_state->startup_target_buffer_reservation.load( std::memory_order_relaxed); startup_session_id = ds.internal_state->startup_session_id; } // The other fields of internal_state are deliberately *not* cleared. // See races-related comments of DataSource::Trace(). TracingMuxer::generation_++; // |producer_backends_| is append-only, Backend instances are always valid. PERFETTO_CHECK(backend_id < producer_backends_.size()); RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id); ProducerImpl* producer = backend.producer.get(); if (!producer) return; // If the data source instance still has a startup buffer reservation, it was // only active for startup tracing and never started by the service. Discard // the startup buffer reservation. if (startup_buffer_reservation) { PERFETTO_DCHECK(startup_session_id); if (producer->service_ && producer->service_->MaybeSharedMemoryArbiter()) { producer->service_->MaybeSharedMemoryArbiter() ->AbortStartupTracingForReservation(startup_buffer_reservation); } auto session_it = std::find_if( backend.startup_sessions.begin(), backend.startup_sessions.end(), [startup_session_id](const RegisteredStartupSession& session) { return session.session_id == startup_session_id; }); // Session should not be removed until abortion of all data source instances // is complete. PERFETTO_DCHECK(session_it != backend.startup_sessions.end()); session_it->num_aborting_data_sources--; if (session_it->num_aborting_data_sources == 0) { if (session_it->on_aborted) task_runner_->PostTask(session_it->on_aborted); backend.startup_sessions.erase(session_it); } } if (producer->connected_ && backend.producer->connection_id_.load(std::memory_order_relaxed) == backend_connection_id) { // Flush any commits that might have been batched by SharedMemoryArbiter. producer->service_->MaybeSharedMemoryArbiter() ->FlushPendingCommitDataRequests(); if (instance_id && will_notify_on_stop) producer->service_->NotifyDataSourceStopped(instance_id); } producer->SweepDeadServices(); } void TracingMuxerImpl::ClearDataSourceIncrementalState( TracingBackendId backend_id, DataSourceInstanceID instance_id) { PERFETTO_DCHECK_THREAD(thread_checker_); PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64, instance_id); auto ds = FindDataSource(backend_id, instance_id); if (!ds) { PERFETTO_ELOG("Could not find data source to clear incremental state for"); return; } DataSourceBase::ClearIncrementalStateArgs clear_incremental_state_args; clear_incremental_state_args.internal_instance_index = ds.instance_idx; { std::unique_lock lock; if (ds.requires_callbacks_under_lock) lock = std::unique_lock(ds.internal_state->lock); ds.internal_state->data_source->WillClearIncrementalState( clear_incremental_state_args); } // Make DataSource::TraceContext::GetIncrementalState() eventually notice that // the incremental state should be cleared. ds.static_state->GetUnsafe(ds.instance_idx) ->incremental_state_generation.fetch_add(1, std::memory_order_relaxed); } bool TracingMuxerImpl::FlushDataSource_AsyncBegin( TracingBackendId backend_id, DataSourceInstanceID instance_id, FlushRequestID flush_id, FlushFlags flush_flags) { PERFETTO_DLOG("Flushing data source %" PRIu64, instance_id); auto ds = FindDataSource(backend_id, instance_id); if (!ds) { PERFETTO_ELOG("Could not find data source to flush"); return true; } uint32_t backend_connection_id = ds.internal_state->backend_connection_id; FlushArgsImpl flush_args; flush_args.flush_flags = flush_flags; flush_args.internal_instance_index = ds.instance_idx; flush_args.async_flush_closure = [this, backend_id, backend_connection_id, instance_id, ds, flush_id] { // TracingMuxerImpl is long lived, capturing |this| is okay. // The notification closure can be moved out of the StopArgs by the // embedder to handle stop asynchronously. The embedder might then // call the closure on a different thread than the current one, hence // this nested PostTask(). task_runner_->PostTask( [this, backend_id, backend_connection_id, instance_id, ds, flush_id] { FlushDataSource_AsyncEnd(backend_id, backend_connection_id, instance_id, ds, flush_id); }); }; { std::unique_lock lock; if (ds.requires_callbacks_under_lock) lock = std::unique_lock(ds.internal_state->lock); ds.internal_state->data_source->OnFlush(flush_args); } // |async_flush_closure| is moved out of |flush_args| if the producer // requested to handle the flush asynchronously. bool handled = static_cast(flush_args.async_flush_closure); return handled; } void TracingMuxerImpl::FlushDataSource_AsyncEnd( TracingBackendId backend_id, uint32_t backend_connection_id, DataSourceInstanceID instance_id, const FindDataSourceRes& ds, FlushRequestID flush_id) { PERFETTO_DLOG("Ending async flush of data source %" PRIu64, instance_id); PERFETTO_DCHECK_THREAD(thread_checker_); // Check that the data source instance is still active and was not modified // while it was being flushed. if (!ds.static_state->TryGet(ds.instance_idx) || ds.internal_state->backend_id != backend_id || ds.internal_state->backend_connection_id != backend_connection_id || ds.internal_state->data_source_instance_id != instance_id) { PERFETTO_ELOG("Async flush of data source %" PRIu64 " failed. This might be due to the data source being stopped " "in the meantime", instance_id); return; } // |producer_backends_| is append-only, Backend instances are always valid. PERFETTO_CHECK(backend_id < producer_backends_.size()); RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id); ProducerImpl* producer = backend.producer.get(); if (!producer) return; // If the tracing service disconnects and reconnects while a data source is // handling a flush request, there's no point is sending the flush reply to // the newly reconnected producer. if (producer->connected_ && backend.producer->connection_id_.load(std::memory_order_relaxed) == backend_connection_id) { producer->NotifyFlushForDataSourceDone(instance_id, flush_id); } } void TracingMuxerImpl::SyncProducersForTesting() { std::mutex mutex; std::condition_variable cv; // IPC-based producers don't report connection errors explicitly for each // command, but instead with an asynchronous callback // (ProducerImpl::OnDisconnected). This means that the sync command below // may have completed but failed to reach the service because of a // disconnection, but we can't tell until the disconnection message comes // through. To guard against this, we run two whole rounds of sync round-trips // before returning; the first one will detect any disconnected producers and // the second one will ensure any reconnections have completed and all data // sources are registered in the service again. for (size_t i = 0; i < 2; i++) { size_t countdown = std::numeric_limits::max(); task_runner_->PostTask([this, &mutex, &cv, &countdown] { { std::unique_lock countdown_lock(mutex); countdown = producer_backends_.size(); } for (auto& backend : producer_backends_) { auto* producer = backend.producer.get(); producer->service_->Sync([&mutex, &cv, &countdown] { std::unique_lock countdown_lock(mutex); countdown--; cv.notify_one(); }); } }); { std::unique_lock countdown_lock(mutex); cv.wait(countdown_lock, [&countdown] { return !countdown; }); } } // Check that all producers are indeed connected. bool done = false; bool all_producers_connected = true; task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] { for (auto& backend : producer_backends_) all_producers_connected &= backend.producer->connected_; std::unique_lock lock(mutex); done = true; cv.notify_one(); }); { std::unique_lock lock(mutex); cv.wait(lock, [&done] { return done; }); } PERFETTO_DCHECK(all_producers_connected); } void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() { // Iterate across all possible data source types. auto cur_generation = generation_.load(std::memory_order_acquire); auto* root_tls = GetOrCreateTracingTLS(); auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) { // |tls| has a vector of per-data-source-instance thread-local state. DataSourceStaticState* static_state = tls.static_state; if (!static_state) return; // Slot not used. // Iterate across all possible instances for this data source. for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) { DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst]; if (!ds_tls.trace_writer) continue; DataSourceState* ds_state = static_state->TryGet(inst); if (ds_state && ds_state->muxer_id_for_testing == ds_tls.muxer_id_for_testing && ds_state->backend_id == ds_tls.backend_id && ds_state->backend_connection_id == ds_tls.backend_connection_id && ds_state->startup_target_buffer_reservation.load( std::memory_order_relaxed) == ds_tls.startup_target_buffer_reservation && ds_state->buffer_id == ds_tls.buffer_id && ds_state->data_source_instance_id == ds_tls.data_source_instance_id) { continue; } // The DataSource instance has been destroyed or recycled. ds_tls.Reset(); // Will also destroy the |ds_tls.trace_writer|. } }; for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) { // |tls| has a vector of per-data-source-instance thread-local state. DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx]; destroy_stopped_instances(tls); } destroy_stopped_instances(root_tls->track_event_tls); root_tls->generation = cur_generation; } // Called both when a new data source is registered or when a new backend // connects. In both cases we want to be sure we reflected the data source // registrations on the backends. void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() { PERFETTO_DCHECK_THREAD(thread_checker_); for (RegisteredDataSource& rds : data_sources_) { UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false); } } void TracingMuxerImpl::UpdateDataSourceOnAllBackends(RegisteredDataSource& rds, bool is_changed) { PERFETTO_DCHECK_THREAD(thread_checker_); for (RegisteredProducerBackend& backend : producer_backends_) { // We cannot call RegisterDataSource on the backend before it connects. if (!backend.producer->connected_) continue; PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources); bool is_registered = backend.producer->registered_data_sources_.test( rds.static_state->index); if (is_registered && !is_changed) continue; if (!rds.descriptor.no_flush()) { rds.descriptor.set_no_flush(rds.no_flush); } rds.descriptor.set_will_notify_on_start(true); if (!rds.descriptor.has_will_notify_on_stop()) { rds.descriptor.set_will_notify_on_stop(true); } rds.descriptor.set_handles_incremental_state_clear(true); rds.descriptor.set_id(rds.static_state->id); if (is_registered) { backend.producer->service_->UpdateDataSource(rds.descriptor); } else { backend.producer->service_->RegisterDataSource(rds.descriptor); } backend.producer->registered_data_sources_.set(rds.static_state->index); } } void TracingMuxerImpl::SetupTracingSession( TracingSessionGlobalID session_id, const std::shared_ptr& trace_config, base::ScopedFile trace_fd) { PERFETTO_DCHECK_THREAD(thread_checker_); PERFETTO_CHECK(!trace_fd || trace_config->write_into_file()); auto* consumer = FindConsumer(session_id); if (!consumer) return; consumer->trace_config_ = trace_config; if (trace_fd) consumer->trace_fd_ = std::move(trace_fd); if (!consumer->connected_) return; // Only used in the deferred start mode. if (trace_config->deferred_start()) { consumer->service_->EnableTracing(*trace_config, std::move(consumer->trace_fd_)); } } void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) { PERFETTO_DCHECK_THREAD(thread_checker_); auto* consumer = FindConsumer(session_id); if (!consumer) return; if (!consumer->trace_config_) { PERFETTO_ELOG("Must call Setup(config) first"); return; } if (!consumer->connected_) { consumer->start_pending_ = true; return; } consumer->start_pending_ = false; if (consumer->trace_config_->deferred_start()) { consumer->service_->StartTracing(); } else { consumer->service_->EnableTracing(*consumer->trace_config_, std::move(consumer->trace_fd_)); } // TODO implement support for the deferred-start + fast-triggering case. } void TracingMuxerImpl::CloneTracingSession( TracingSessionGlobalID session_id, TracingSession::CloneTraceArgs args, TracingSession::CloneTraceCallback callback) { PERFETTO_DCHECK_THREAD(thread_checker_); auto* consumer = FindConsumer(session_id); if (!consumer) { TracingSession::CloneTraceCallbackArgs callback_arg{}; callback_arg.success = false; callback_arg.error = "Tracing session not found"; callback(callback_arg); return; } // Multiple concurrent cloning isn't supported. PERFETTO_DCHECK(!consumer->clone_trace_callback_); consumer->clone_trace_callback_ = std::move(callback); ConsumerEndpoint::CloneSessionArgs consumer_args{}; consumer_args.unique_session_name = args.unique_session_name; if (!consumer->connected_) { consumer->session_to_clone_ = std::move(consumer_args); return; } consumer->session_to_clone_ = std::nullopt; consumer->service_->CloneSession(consumer_args); } void TracingMuxerImpl::ChangeTracingSessionConfig( TracingSessionGlobalID session_id, const TraceConfig& trace_config) { PERFETTO_DCHECK_THREAD(thread_checker_); auto* consumer = FindConsumer(session_id); if (!consumer) return; if (!consumer->trace_config_) { // Changing the config is only supported for started sessions. PERFETTO_ELOG("Must call Setup(config) and Start() first"); return; } consumer->trace_config_ = std::make_shared(trace_config); if (consumer->connected_) consumer->service_->ChangeTraceConfig(trace_config); } void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id, uint32_t timeout_ms, std::function callback) { PERFETTO_DCHECK_THREAD(thread_checker_); auto* consumer = FindConsumer(session_id); if (!consumer || consumer->start_pending_ || consumer->stop_pending_ || !consumer->trace_config_) { PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()"); std::move(callback)(false); return; } // For now we don't want to expose the flush reason to the consumer-side SDK // users to avoid misuses until there is a strong need. consumer->service_->Flush(timeout_ms, std::move(callback), FlushFlags(FlushFlags::Initiator::kConsumerSdk, FlushFlags::Reason::kExplicit)); } void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) { PERFETTO_DCHECK_THREAD(thread_checker_); auto* consumer = FindConsumer(session_id); if (!consumer) return; if (consumer->start_pending_) { // If the session hasn't started yet, wait until it does before stopping. consumer->stop_pending_ = true; return; } consumer->stop_pending_ = false; if (consumer->stopped_) { // If the session was already stopped (e.g., it failed to start), don't try // stopping again. consumer->NotifyStopComplete(); } else if (!consumer->trace_config_) { PERFETTO_ELOG("Must call Setup(config) and Start() first"); return; } else { consumer->service_->DisableTracing(); } consumer->trace_config_.reset(); } void TracingMuxerImpl::DestroyTracingSession( TracingSessionGlobalID session_id) { PERFETTO_DCHECK_THREAD(thread_checker_); for (RegisteredConsumerBackend& backend : consumer_backends_) { // We need to find the consumer (if any) and call Disconnect as we destroy // the tracing session. We can't call Disconnect() inside this for loop // because in the in-process case this will end up to a synchronous call to // OnConsumerDisconnect which will invalidate all the iterators to // |backend.consumers|. ConsumerImpl* consumer = nullptr; for (auto& con : backend.consumers) { if (con->session_id_ == session_id) { consumer = con.get(); break; } } if (consumer) { // We broke out of the loop above on the assumption that each backend will // only have a single consumer per session. This DCHECK ensures that // this is the case. PERFETTO_DCHECK( std::count_if(backend.consumers.begin(), backend.consumers.end(), [session_id](const std::unique_ptr& con) { return con->session_id_ == session_id; }) == 1u); consumer->Disconnect(); } } } void TracingMuxerImpl::ReadTracingSessionData( TracingSessionGlobalID session_id, std::function callback) { PERFETTO_DCHECK_THREAD(thread_checker_); auto* consumer = FindConsumer(session_id); if (!consumer) { // TODO(skyostil): Signal an error to the user. TracingSession::ReadTraceCallbackArgs callback_arg{}; callback(callback_arg); return; } PERFETTO_DCHECK(!consumer->read_trace_callback_); consumer->read_trace_callback_ = std::move(callback); consumer->service_->ReadBuffers(); } void TracingMuxerImpl::GetTraceStats( TracingSessionGlobalID session_id, TracingSession::GetTraceStatsCallback callback) { PERFETTO_DCHECK_THREAD(thread_checker_); auto* consumer = FindConsumer(session_id); if (!consumer) { TracingSession::GetTraceStatsCallbackArgs callback_arg{}; callback_arg.success = false; callback(std::move(callback_arg)); return; } PERFETTO_DCHECK(!consumer->get_trace_stats_callback_); consumer->get_trace_stats_callback_ = std::move(callback); if (!consumer->connected_) { consumer->get_trace_stats_pending_ = true; return; } consumer->get_trace_stats_pending_ = false; consumer->service_->GetTraceStats(); } void TracingMuxerImpl::QueryServiceState( TracingSessionGlobalID session_id, TracingSession::QueryServiceStateCallback callback) { PERFETTO_DCHECK_THREAD(thread_checker_); auto* consumer = FindConsumer(session_id); if (!consumer) { TracingSession::QueryServiceStateCallbackArgs callback_arg{}; callback_arg.success = false; callback(std::move(callback_arg)); return; } PERFETTO_DCHECK(!consumer->query_service_state_callback_); if (!consumer->connected_) { consumer->query_service_state_callback_ = std::move(callback); return; } auto callback_wrapper = [callback](bool success, protos::gen::TracingServiceState state) { TracingSession::QueryServiceStateCallbackArgs callback_arg{}; callback_arg.success = success; callback_arg.service_state_data = state.SerializeAsArray(); callback(std::move(callback_arg)); }; consumer->service_->QueryServiceState({}, std::move(callback_wrapper)); } void TracingMuxerImpl::SetBatchCommitsDurationForTesting( uint32_t batch_commits_duration_ms, BackendType backend_type) { for (RegisteredProducerBackend& backend : producer_backends_) { if (backend.producer && backend.producer->connected_ && backend.type == backend_type) { backend.producer->service_->MaybeSharedMemoryArbiter() ->SetBatchCommitsDuration(batch_commits_duration_ms); } } } bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting( BackendType backend_type) { for (RegisteredProducerBackend& backend : producer_backends_) { if (backend.producer && backend.producer->connected_ && backend.type == backend_type && !backend.producer->service_->MaybeSharedMemoryArbiter() ->EnableDirectSMBPatching()) { return false; } } return true; } TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer( TracingSessionGlobalID session_id) { PERFETTO_DCHECK_THREAD(thread_checker_); return FindConsumerAndBackend(session_id).first; } std::pair TracingMuxerImpl::FindConsumerAndBackend(TracingSessionGlobalID session_id) { PERFETTO_DCHECK_THREAD(thread_checker_); for (RegisteredConsumerBackend& backend : consumer_backends_) { for (auto& consumer : backend.consumers) { if (consumer->session_id_ == session_id) { return {consumer.get(), &backend}; } } } return {nullptr, nullptr}; } void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) { PERFETTO_DCHECK_THREAD(thread_checker_); auto res = FindConsumerAndBackend(session_id); if (!res.first || !res.second) return; TracingMuxerImpl::ConsumerImpl* consumer = res.first; RegisteredConsumerBackend& backend = *res.second; TracingBackend::ConnectConsumerArgs conn_args; conn_args.consumer = consumer; conn_args.task_runner = task_runner_.get(); consumer->Initialize(backend.backend->ConnectConsumer(conn_args)); } void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) { PERFETTO_DCHECK_THREAD(thread_checker_); for (RegisteredConsumerBackend& backend : consumer_backends_) { auto pred = [consumer](const std::unique_ptr& con) { return con.get() == consumer; }; backend.consumers.erase(std::remove_if(backend.consumers.begin(), backend.consumers.end(), pred), backend.consumers.end()); } } void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) { max_producer_reconnections_.store(count); } void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) { PERFETTO_DCHECK_THREAD(thread_checker_); for (RegisteredProducerBackend& backend : producer_backends_) { if (backend.producer.get() != producer) continue; // The tracing service is disconnected. It does not make sense to keep // tracing (we wouldn't be able to commit). On reconnection, the tracing // service will restart the data sources. for (const auto& rds : data_sources_) { DataSourceStaticState* static_state = rds.static_state; for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) { auto* internal_state = static_state->TryGet(i); if (internal_state && internal_state->backend_id == backend.id && internal_state->backend_connection_id == backend.producer->connection_id_.load( std::memory_order_relaxed)) { StopDataSource_AsyncBeginImpl( FindDataSourceRes(static_state, internal_state, i, rds.requires_callbacks_under_lock)); } } } // Try reconnecting the disconnected producer. If the connection succeeds, // all the data sources will be automatically re-registered. if (producer->connection_id_.load(std::memory_order_relaxed) > max_producer_reconnections_.load()) { // Avoid reconnecting a failing producer too many times. Instead we just // leak the producer instead of trying to avoid further complicating // cross-thread trace writer creation. PERFETTO_ELOG("Producer disconnected too many times; not reconnecting"); continue; } backend.producer->Initialize( backend.backend->ConnectProducer(backend.producer_conn_args)); // Don't use producer-provided SMBs for the next connection unless startup // tracing requires it again. backend.producer_conn_args.use_producer_provided_smb = false; } } void TracingMuxerImpl::SweepDeadBackends() { PERFETTO_DCHECK_THREAD(thread_checker_); for (auto it = dead_backends_.begin(); it != dead_backends_.end();) { auto next_it = it; next_it++; if (it->producer->SweepDeadServices()) dead_backends_.erase(it); it = next_it; } } TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource( TracingBackendId backend_id, DataSourceInstanceID instance_id) { PERFETTO_DCHECK_THREAD(thread_checker_); RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id); for (const auto& rds : data_sources_) { DataSourceStaticState* static_state = rds.static_state; for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) { auto* internal_state = static_state->TryGet(i); if (internal_state && internal_state->backend_id == backend_id && internal_state->backend_connection_id == backend.producer->connection_id_.load( std::memory_order_relaxed) && internal_state->data_source_instance_id == instance_id) { return FindDataSourceRes(static_state, internal_state, i, rds.requires_callbacks_under_lock); } } } return FindDataSourceRes(); } // Can be called from any thread. std::unique_ptr TracingMuxerImpl::CreateTraceWriter( DataSourceStaticState* static_state, uint32_t data_source_instance_index, DataSourceState* data_source, BufferExhaustedPolicy buffer_exhausted_policy) { if (PERFETTO_UNLIKELY(data_source->interceptor_id)) { // If the session is being intercepted, return a heap-backed trace writer // instead. This is safe because all the data given to the interceptor is // either thread-local (|instance_index|), statically allocated // (|static_state|) or constant after initialization (|interceptor|). Access // to the interceptor instance itself through |data_source| is protected by // a statically allocated lock (similarly to the data source instance). auto& interceptor = interceptors_[data_source->interceptor_id - 1]; return std::unique_ptr(new InterceptorTraceWriter( interceptor.tls_factory(static_state, data_source_instance_index), interceptor.packet_callback, static_state, data_source_instance_index)); } ProducerImpl* producer = FindProducerBackendById(data_source->backend_id)->producer.get(); // Atomically load the current service endpoint. We keep the pointer as a // shared pointer on the stack to guard against it from being concurrently // modified on the thread by ProducerImpl::Initialize() swapping in a // reconnected service on the muxer task runner thread. // // The endpoint may also be concurrently modified by SweepDeadServices() // clearing out old disconnected services. We guard against that by // SharedMemoryArbiter keeping track of any outstanding trace writers. After // shutdown has started, the trace writer created below will be a null one // which will drop any written data. See SharedMemoryArbiter::TryShutdown(). // // We use an atomic pointer instead of holding a lock because // CreateTraceWriter posts tasks under the hood. std::shared_ptr service = std::atomic_load(&producer->service_); // The service may have been disconnected and reconnected concurrently after // the data source was enabled, in which case we may not have an arbiter, or // would be creating a TraceWriter for the wrong (a newer) connection / SMB. // Instead, early-out now. A relaxed load is fine here because the atomic_load // above ensures that the |service| isn't newer. if (producer->connection_id_.load(std::memory_order_relaxed) != data_source->backend_connection_id) { return std::unique_ptr(new NullTraceWriter()); } // We just need a relaxed atomic read here: We can use the reservation ID even // after the buffer was bound, we just need to be sure to read it atomically. uint16_t startup_buffer_reservation = data_source->startup_target_buffer_reservation.load( std::memory_order_relaxed); if (startup_buffer_reservation) { return service->MaybeSharedMemoryArbiter()->CreateStartupTraceWriter( startup_buffer_reservation); } return service->CreateTraceWriter(data_source->buffer_id, buffer_exhausted_policy); } // This is called via the public API Tracing::NewTrace(). // Can be called from any thread. std::unique_ptr TracingMuxerImpl::CreateTracingSession( BackendType requested_backend_type, TracingConsumerBackend* (*system_backend_factory)()) { TracingSessionGlobalID session_id = ++next_tracing_session_id_; // |backend_type| can only specify one backend, not an OR-ed mask. PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0); // Capturing |this| is fine because the TracingMuxer is a leaky singleton. task_runner_->PostTask([this, requested_backend_type, session_id, system_backend_factory] { if (requested_backend_type == kSystemBackend && system_backend_factory && !FindConsumerBackendByType(kSystemBackend)) { AddConsumerBackend(system_backend_factory(), kSystemBackend); } for (RegisteredConsumerBackend& backend : consumer_backends_) { if (requested_backend_type && backend.type && backend.type != requested_backend_type) { continue; } // Create the consumer now, even if we have to ask the embedder below, so // that any other tasks executing after this one can find the consumer and // change its pending attributes. backend.consumers.emplace_back( new ConsumerImpl(this, backend.type, session_id)); // The last registered backend in |consumer_backends_| is the unsupported // backend without a valid type. if (!backend.type) { PERFETTO_ELOG( "No tracing backend ready for type=%d, consumer will disconnect", requested_backend_type); InitializeConsumer(session_id); return; } // Check if the embedder wants to be asked for permission before // connecting the consumer. if (!policy_) { InitializeConsumer(session_id); return; } BackendType type = backend.type; TracingPolicy::ShouldAllowConsumerSessionArgs args; args.backend_type = backend.type; args.result_callback = [this, type, session_id](bool allow) { task_runner_->PostTask([this, type, session_id, allow] { if (allow) { InitializeConsumer(session_id); return; } PERFETTO_ELOG( "Consumer session for backend type type=%d forbidden, " "consumer will disconnect", type); auto* consumer = FindConsumer(session_id); if (!consumer) return; consumer->OnDisconnect(); }); }; policy_->ShouldAllowConsumerSession(args); return; } PERFETTO_DFATAL("Not reached"); }); return std::unique_ptr( new TracingSessionImpl(this, session_id, requested_backend_type)); } // static // This is called via the public API Tracing::SetupStartupTracing(). // Can be called from any thread. std::unique_ptr TracingMuxerImpl::CreateStartupTracingSession( const TraceConfig& config, Tracing::SetupStartupTracingOpts opts) { BackendType backend_type = opts.backend; // |backend_type| can only specify one backend, not an OR-ed mask. PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0); // The in-process backend doesn't support startup tracing. PERFETTO_CHECK(backend_type != BackendType::kInProcessBackend); TracingSessionGlobalID session_id = ++next_tracing_session_id_; // Capturing |this| is fine because the TracingMuxer is a leaky singleton. task_runner_->PostTask([this, config, opts, backend_type, session_id] { for (RegisteredProducerBackend& backend : producer_backends_) { if (backend_type && backend.type && backend.type != backend_type) { continue; } TracingBackendId backend_id = backend.id; // The last registered backend in |producer_backends_| is the unsupported // backend without a valid type. if (!backend.type) { PERFETTO_ELOG( "No tracing backend initialized for type=%d, startup tracing " "failed", backend_type); if (opts.on_setup) opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{ 0 /* num_data_sources_started */}); return; } if (!backend.producer->service_ || !backend.producer->service_->shared_memory()) { // If we unsuccessfully attempted to use a producer-provided SMB in the // past, don't try again. if (backend.producer->producer_provided_smb_failed_) { PERFETTO_ELOG( "Backend %zu doesn't seem to support producer-provided " "SMBs, startup tracing failed", backend_id); if (opts.on_setup) opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{ 0 /* num_data_sources_started */}); return; } PERFETTO_DLOG("Reconnecting backend %zu for startup tracing", backend_id); backend.producer_conn_args.use_producer_provided_smb = true; backend.producer->service_->Disconnect(); // Causes a reconnect. PERFETTO_DCHECK(backend.producer->service_ && backend.producer->service_->MaybeSharedMemoryArbiter()); } RegisteredStartupSession session; session.session_id = session_id; session.on_aborted = opts.on_aborted; session.on_adopted = opts.on_adopted; for (const TraceConfig::DataSource& ds_cfg : config.data_sources()) { // Find all matching data sources and start one instance of each. for (const auto& rds : data_sources_) { if (rds.descriptor.name() != ds_cfg.config().name()) continue; PERFETTO_DLOG( "Setting up data source %s for startup tracing with target " "buffer reservation %" PRIi32, rds.descriptor.name().c_str(), backend.producer->last_startup_target_buffer_reservation_ + 1u); auto ds = SetupDataSourceImpl( rds, backend_id, backend.producer->connection_id_.load(std::memory_order_relaxed), /*instance_id=*/0, ds_cfg.config(), /*startup_session_id=*/session_id); if (ds) { StartDataSourceImpl(ds); session.num_unbound_data_sources++; } } } int num_ds = session.num_unbound_data_sources; auto on_setup = opts.on_setup; if (on_setup) { backend.producer->OnStartupTracingSetup(); task_runner_->PostTask([on_setup, num_ds] { on_setup(Tracing::OnStartupTracingSetupCallbackArgs{num_ds}); }); } if (num_ds > 0) { backend.startup_sessions.push_back(std::move(session)); if (opts.timeout_ms > 0) { task_runner_->PostDelayedTask( [this, session_id, backend_type] { AbortStartupTracingSession(session_id, backend_type); }, opts.timeout_ms); } } return; } PERFETTO_DFATAL("Invalid startup tracing session backend"); }); return std::unique_ptr( new StartupTracingSessionImpl(this, session_id, backend_type)); } // Must not be called from the SDK's internal thread. std::unique_ptr TracingMuxerImpl::CreateStartupTracingSessionBlocking( const TraceConfig& config, Tracing::SetupStartupTracingOpts opts) { auto previous_on_setup = std::move(opts.on_setup); PERFETTO_CHECK(!task_runner_->RunsTasksOnCurrentThread()); base::WaitableEvent event; // It is safe to capture by reference because once on_setup is called only // once before this method returns. opts.on_setup = [&](Tracing::OnStartupTracingSetupCallbackArgs args) { if (previous_on_setup) { previous_on_setup(std::move(args)); } event.Notify(); }; auto session = CreateStartupTracingSession(config, std::move(opts)); event.Wait(); return session; } void TracingMuxerImpl::AbortStartupTracingSession( TracingSessionGlobalID session_id, BackendType backend_type) { PERFETTO_DCHECK_THREAD(thread_checker_); for (RegisteredProducerBackend& backend : producer_backends_) { if (backend_type != backend.type) continue; auto session_it = std::find_if( backend.startup_sessions.begin(), backend.startup_sessions.end(), [session_id](const RegisteredStartupSession& session) { return session.session_id == session_id; }); // The startup session may have already been aborted or fully adopted. if (session_it == backend.startup_sessions.end()) return; if (session_it->is_aborting) return; session_it->is_aborting = true; // Iterate all data sources and abort them if they weren't adopted yet. for (const auto& rds : data_sources_) { DataSourceStaticState* static_state = rds.static_state; for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) { auto* internal_state = static_state->TryGet(i); if (internal_state && internal_state->startup_target_buffer_reservation.load( std::memory_order_relaxed) && internal_state->data_source_instance_id == 0 && internal_state->startup_session_id == session_id) { PERFETTO_DLOG( "Aborting startup tracing for data source %s (target buffer " "reservation %" PRIu16 ")", rds.descriptor.name().c_str(), internal_state->startup_target_buffer_reservation.load( std::memory_order_relaxed)); // Abort the instance asynchronously by stopping it. From this point // onwards, the service will not be able to adopt it via // StartDataSource(). session_it->num_aborting_data_sources++; StopDataSource_AsyncBeginImpl( FindDataSourceRes(static_state, internal_state, i, rds.requires_callbacks_under_lock)); } } } // If we did everything right, we should have aborted all still-unbound data // source instances. PERFETTO_DCHECK(session_it->num_unbound_data_sources == session_it->num_aborting_data_sources); if (session_it->num_aborting_data_sources == 0) { if (session_it->on_aborted) task_runner_->PostTask(session_it->on_aborted); backend.startup_sessions.erase(session_it); } return; } // We might reach here in tests because when we start a trace, we post the // Task(AbortStartupTrace, delay=timeout). When we do // perfetto::ResetForTesting, we sweep dead backends, and we are not able to // kill those delayed tasks because TaskRunner doesn't have support for // deleting scheduled future tasks and TaskRunner doesn't have any API for us // to wait for the completion of all the scheduled tasks (apart from // deleting the TaskRunner) and we want to avoid doing that because we need // a long running TaskRunner in muxer. PERFETTO_DLOG("Invalid startup tracing session backend"); } void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) { if (instance_ != TracingMuxerFake::Get()) { // The tracing muxer was already initialized. We might need to initialize // additional backends that were not configured earlier. auto* muxer = static_cast(instance_); muxer->task_runner_->PostTask([muxer, args] { muxer->AddBackends(args); }); return; } // If we previously had a TracingMuxerImpl instance which was reset, // reinitialize and reuse it instead of trying to create a new one. See // ResetForTesting(). if (g_prev_instance) { auto* muxer = g_prev_instance; g_prev_instance = nullptr; instance_ = muxer; muxer->task_runner_->PostTask([muxer, args] { muxer->Initialize(args); muxer->AddBackends(args); }); } else { new TracingMuxerImpl(args); } } // static void TracingMuxerImpl::ResetForTesting() { // Ideally we'd tear down the entire TracingMuxerImpl, but the lifetimes of // various objects make that a non-starter. In particular: // // 1) Any thread that has entered a trace event has a TraceWriter, which holds // a reference back to ProducerImpl::service_. // // 2) ProducerImpl::service_ has a reference back to the ProducerImpl. // // 3) ProducerImpl holds reference to TracingMuxerImpl::task_runner_, which in // turn depends on TracingMuxerImpl itself. // // Because of this, it's not safe to deallocate TracingMuxerImpl until all // threads have dropped their TraceWriters. Since we can't really ask the // caller to guarantee this, we'll instead reset enough of the muxer's state // so that it can be reinitialized later and ensure all necessary objects from // the old state remain alive until all references have gone away. auto* muxer = reinterpret_cast(instance_); base::WaitableEvent reset_done; auto do_reset = [muxer, &reset_done] { muxer->DestroyStoppedTraceWritersForCurrentThread(); // Unregister all data sources so they don't interfere with any future // tracing sessions. for (RegisteredDataSource& rds : muxer->data_sources_) { for (RegisteredProducerBackend& backend : muxer->producer_backends_) { if (!backend.producer->service_ || !backend.producer->connected_) continue; backend.producer->service_->UnregisterDataSource(rds.descriptor.name()); } } for (auto& backend : muxer->consumer_backends_) { // Check that no consumer session is currently active on any backend. for (auto& consumer : backend.consumers) PERFETTO_CHECK(!consumer->service_); } for (auto& backend : muxer->producer_backends_) { backend.producer->muxer_ = nullptr; backend.producer->DisposeConnection(); muxer->dead_backends_.push_back(std::move(backend)); } muxer->consumer_backends_.clear(); muxer->producer_backends_.clear(); muxer->interceptors_.clear(); for (auto& ds : muxer->data_sources_) { ds.static_state->ResetForTesting(); } muxer->data_sources_.clear(); muxer->next_data_source_index_ = 0; // Free all backends without active trace writers or other inbound // references. Note that even if all the backends get swept, the muxer still // needs to stay around since |task_runner_| is assumed to be long-lived. muxer->SweepDeadBackends(); // Make sure we eventually discard any per-thread trace writers from the // previous instance. muxer->muxer_id_for_testing_++; g_prev_instance = muxer; instance_ = TracingMuxerFake::Get(); // Call the user provided cleanups on the muxer thread. for (auto& cb : muxer->reset_callbacks_) { cb(); } reset_done.Notify(); }; // Some tests run the muxer and the test on the same thread. In these cases, // we can reset synchronously. if (muxer->task_runner_->RunsTasksOnCurrentThread()) { do_reset(); } else { muxer->DestroyStoppedTraceWritersForCurrentThread(); muxer->task_runner_->PostTask(std::move(do_reset)); reset_done.Wait(); // Call the user provided cleanups also on this thread. for (auto& cb : muxer->reset_callbacks_) { cb(); } } muxer->reset_callbacks_.clear(); } // static void TracingMuxerImpl::Shutdown() { auto* muxer = reinterpret_cast(instance_); // Shutting down on the muxer thread would lead to a deadlock. PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread()); muxer->DestroyStoppedTraceWritersForCurrentThread(); std::unique_ptr owned_task_runner( muxer->task_runner_.get()); base::WaitableEvent shutdown_done; owned_task_runner->PostTask([muxer, &shutdown_done] { // Check that no consumer session is currently active on any backend. // Producers will be automatically disconnected as a part of deleting the // muxer below. for (auto& backend : muxer->consumer_backends_) { for (auto& consumer : backend.consumers) { PERFETTO_CHECK(!consumer->service_); } } // Make sure no trace writers are lingering around on the muxer thread. Note // that we can't do this for any arbitrary thread in the process; it is the // caller's responsibility to clean them up before shutting down Perfetto. muxer->DestroyStoppedTraceWritersForCurrentThread(); // The task runner must be deleted outside the muxer thread. This is done by // `owned_task_runner` above. muxer->task_runner_.release(); auto* platform = muxer->platform_; delete muxer; instance_ = TracingMuxerFake::Get(); platform->Shutdown(); shutdown_done.Notify(); }); shutdown_done.Wait(); } void TracingMuxerImpl::AppendResetForTestingCallback(std::function cb) { reset_callbacks_.push_back(std::move(cb)); } TracingMuxer::~TracingMuxer() = default; static_assert(std::is_same::value, "public's BufferId and tracing/core's BufferID diverged"); } // namespace internal } // namespace perfetto