xref: /aosp_15_r20/external/perfetto/src/tracing/ipc/producer/producer_ipc_client_impl.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/producer/producer_ipc_client_impl.h"
18 
19 #include <cinttypes>
20 
21 #include <string.h>
22 
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/task_runner.h"
25 #include "perfetto/ext/base/unix_socket.h"
26 #include "perfetto/ext/base/version.h"
27 #include "perfetto/ext/ipc/client.h"
28 #include "perfetto/ext/tracing/core/commit_data_request.h"
29 #include "perfetto/ext/tracing/core/producer.h"
30 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
31 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
32 #include "perfetto/ext/tracing/core/trace_writer.h"
33 #include "perfetto/tracing/core/data_source_config.h"
34 #include "perfetto/tracing/core/data_source_descriptor.h"
35 #include "perfetto/tracing/core/trace_config.h"
36 #include "src/tracing/core/in_process_shared_memory.h"
37 
38 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
39 #include "src/tracing/ipc/shared_memory_windows.h"
40 #else
41 #include "src/tracing/ipc/posix_shared_memory.h"
42 #endif
43 
44 // TODO(fmayer): think to what happens when ProducerIPCClientImpl gets destroyed
45 // w.r.t. the Producer pointer. Also think to lifetime of the Producer* during
46 // the callbacks.
47 
48 namespace perfetto {
49 
50 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(const char * service_sock_name,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter,ConnectionFlags conn_flags)51 std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect(
52     const char* service_sock_name,
53     Producer* producer,
54     const std::string& producer_name,
55     base::TaskRunner* task_runner,
56     TracingService::ProducerSMBScrapingMode smb_scraping_mode,
57     size_t shared_memory_size_hint_bytes,
58     size_t shared_memory_page_size_hint_bytes,
59     std::unique_ptr<SharedMemory> shm,
60     std::unique_ptr<SharedMemoryArbiter> shm_arbiter,
61     ConnectionFlags conn_flags) {
62   return std::unique_ptr<TracingService::ProducerEndpoint>(
63       new ProducerIPCClientImpl(
64           {service_sock_name,
65            conn_flags ==
66                ProducerIPCClient::ConnectionFlags::kRetryIfUnreachable},
67           producer, producer_name, task_runner, smb_scraping_mode,
68           shared_memory_size_hint_bytes, shared_memory_page_size_hint_bytes,
69           std::move(shm), std::move(shm_arbiter), nullptr));
70 }
71 
72 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(ipc::Client::ConnArgs conn_args,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter,CreateSocketAsync create_socket_async)73 std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect(
74     ipc::Client::ConnArgs conn_args,
75     Producer* producer,
76     const std::string& producer_name,
77     base::TaskRunner* task_runner,
78     TracingService::ProducerSMBScrapingMode smb_scraping_mode,
79     size_t shared_memory_size_hint_bytes,
80     size_t shared_memory_page_size_hint_bytes,
81     std::unique_ptr<SharedMemory> shm,
82     std::unique_ptr<SharedMemoryArbiter> shm_arbiter,
83     CreateSocketAsync create_socket_async) {
84   return std::unique_ptr<TracingService::ProducerEndpoint>(
85       new ProducerIPCClientImpl(
86           std::move(conn_args), producer, producer_name, task_runner,
87           smb_scraping_mode, shared_memory_size_hint_bytes,
88           shared_memory_page_size_hint_bytes, std::move(shm),
89           std::move(shm_arbiter), create_socket_async));
90 }
91 
ProducerIPCClientImpl(ipc::Client::ConnArgs conn_args,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter,CreateSocketAsync create_socket_async)92 ProducerIPCClientImpl::ProducerIPCClientImpl(
93     ipc::Client::ConnArgs conn_args,
94     Producer* producer,
95     const std::string& producer_name,
96     base::TaskRunner* task_runner,
97     TracingService::ProducerSMBScrapingMode smb_scraping_mode,
98     size_t shared_memory_size_hint_bytes,
99     size_t shared_memory_page_size_hint_bytes,
100     std::unique_ptr<SharedMemory> shm,
101     std::unique_ptr<SharedMemoryArbiter> shm_arbiter,
102     CreateSocketAsync create_socket_async)
103     : producer_(producer),
104       task_runner_(task_runner),
105       receive_shmem_fd_cb_fuchsia_(
106           std::move(conn_args.receive_shmem_fd_cb_fuchsia)),
107       producer_port_(
108           new protos::gen::ProducerPortProxy(this /* event_listener */)),
109       shared_memory_(std::move(shm)),
110       shared_memory_arbiter_(std::move(shm_arbiter)),
111       name_(producer_name),
112       shared_memory_page_size_hint_bytes_(shared_memory_page_size_hint_bytes),
113       shared_memory_size_hint_bytes_(shared_memory_size_hint_bytes),
114       smb_scraping_mode_(smb_scraping_mode) {
115   // Check for producer-provided SMB (used by Chrome for startup tracing).
116   if (shared_memory_) {
117     // We also expect a valid (unbound) arbiter. Bind it to this endpoint now.
118     PERFETTO_CHECK(shared_memory_arbiter_);
119     shared_memory_arbiter_->BindToProducerEndpoint(this, task_runner_);
120 
121     // If the service accepts our SMB, then it must match our requested page
122     // layout. The protocol doesn't allow the service to change the size and
123     // layout when the SMB is provided by the producer.
124     shared_buffer_page_size_kb_ = shared_memory_page_size_hint_bytes_ / 1024;
125   }
126 
127   if (create_socket_async) {
128     PERFETTO_DCHECK(conn_args.socket_name);
129     auto weak_this = weak_factory_.GetWeakPtr();
130     create_socket_async(
131         [weak_this, task_runner = task_runner_](base::SocketHandle fd) {
132           task_runner->PostTask([weak_this, fd] {
133             base::ScopedSocketHandle handle(fd);
134             if (!weak_this) {
135               return;
136             }
137             ipc::Client::ConnArgs args(std::move(handle));
138             weak_this->ipc_channel_ = ipc::Client::CreateInstance(
139                 std::move(args), weak_this->task_runner_);
140             weak_this->ipc_channel_->BindService(
141                 weak_this->producer_port_->GetWeakPtr());
142           });
143         });
144   } else {
145     ipc_channel_ =
146         ipc::Client::CreateInstance(std::move(conn_args), task_runner);
147     ipc_channel_->BindService(producer_port_->GetWeakPtr());
148   }
149   PERFETTO_DCHECK_THREAD(thread_checker_);
150 }
151 
~ProducerIPCClientImpl()152 ProducerIPCClientImpl::~ProducerIPCClientImpl() {
153   PERFETTO_DCHECK_THREAD(thread_checker_);
154 }
155 
Disconnect()156 void ProducerIPCClientImpl::Disconnect() {
157   PERFETTO_DCHECK_THREAD(thread_checker_);
158   if (!producer_port_)
159     return;
160   // Reset the producer port so that no further IPCs are received and IPC
161   // callbacks are no longer executed. Also reset the IPC channel so that the
162   // service is notified of the disconnection.
163   producer_port_.reset();
164   ipc_channel_.reset();
165   // Perform disconnect synchronously.
166   OnDisconnect();
167 }
168 
169 // Called by the IPC layer if the BindService() succeeds.
OnConnect()170 void ProducerIPCClientImpl::OnConnect() {
171   PERFETTO_DCHECK_THREAD(thread_checker_);
172   connected_ = true;
173 
174   // The IPC layer guarantees that any outstanding callback will be dropped on
175   // the floor if producer_port_ is destroyed between the request and the reply.
176   // Binding |this| is hence safe.
177   ipc::Deferred<protos::gen::InitializeConnectionResponse> on_init;
178   on_init.Bind(
179       [this](ipc::AsyncResult<protos::gen::InitializeConnectionResponse> resp) {
180         OnConnectionInitialized(
181             resp.success(),
182             resp.success() ? resp->using_shmem_provided_by_producer() : false,
183             resp.success() ? resp->direct_smb_patching_supported() : false,
184             resp.success() ? resp->use_shmem_emulation() : false);
185       });
186   protos::gen::InitializeConnectionRequest req;
187   req.set_producer_name(name_);
188   req.set_shared_memory_size_hint_bytes(
189       static_cast<uint32_t>(shared_memory_size_hint_bytes_));
190   req.set_shared_memory_page_size_hint_bytes(
191       static_cast<uint32_t>(shared_memory_page_size_hint_bytes_));
192   switch (smb_scraping_mode_) {
193     case TracingService::ProducerSMBScrapingMode::kDefault:
194       // No need to set the mode, it defaults to use the service default if
195       // unspecified.
196       break;
197     case TracingService::ProducerSMBScrapingMode::kEnabled:
198       req.set_smb_scraping_mode(
199           protos::gen::InitializeConnectionRequest::SMB_SCRAPING_ENABLED);
200       break;
201     case TracingService::ProducerSMBScrapingMode::kDisabled:
202       req.set_smb_scraping_mode(
203           protos::gen::InitializeConnectionRequest::SMB_SCRAPING_DISABLED);
204       break;
205   }
206 
207   int shm_fd = -1;
208   if (shared_memory_) {
209     req.set_producer_provided_shmem(true);
210 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
211     auto key = static_cast<SharedMemoryWindows*>(shared_memory_.get())->key();
212     req.set_shm_key_windows(key);
213 #else
214     shm_fd = static_cast<PosixSharedMemory*>(shared_memory_.get())->fd();
215 #endif
216   }
217 
218   req.set_sdk_version(base::GetVersionString());
219   producer_port_->InitializeConnection(req, std::move(on_init), shm_fd);
220 
221   // Create the back channel to receive commands from the Service.
222   ipc::Deferred<protos::gen::GetAsyncCommandResponse> on_cmd;
223   on_cmd.Bind(
224       [this](ipc::AsyncResult<protos::gen::GetAsyncCommandResponse> resp) {
225         if (!resp)
226           return;  // The IPC channel was closed and |resp| was auto-rejected.
227         OnServiceRequest(*resp);
228       });
229   producer_port_->GetAsyncCommand(protos::gen::GetAsyncCommandRequest(),
230                                   std::move(on_cmd));
231 
232   // If there are pending Sync() requests, send them now.
233   for (auto& pending_sync : pending_sync_reqs_)
234     Sync(std::move(pending_sync));
235   pending_sync_reqs_.clear();
236 }
237 
OnDisconnect()238 void ProducerIPCClientImpl::OnDisconnect() {
239   PERFETTO_DCHECK_THREAD(thread_checker_);
240   PERFETTO_DLOG("Tracing service connection failure");
241   connected_ = false;
242   data_sources_setup_.clear();
243   producer_->OnDisconnect();  // Note: may delete |this|.
244 }
245 
ScheduleDisconnect()246 void ProducerIPCClientImpl::ScheduleDisconnect() {
247   // |ipc_channel| doesn't allow disconnection in the middle of handling
248   // an IPC call, so the connection drop must take place over two phases.
249 
250   // First, synchronously drop the |producer_port_| so that no more IPC
251   // messages are handled.
252   producer_port_.reset();
253 
254   // Then schedule an async task for performing the remainder of the
255   // disconnection operations outside the context of the IPC method handler.
256   auto weak_this = weak_factory_.GetWeakPtr();
257   task_runner_->PostTask([weak_this]() {
258     if (weak_this) {
259       weak_this->Disconnect();
260     }
261   });
262 }
263 
OnConnectionInitialized(bool connection_succeeded,bool using_shmem_provided_by_producer,bool direct_smb_patching_supported,bool use_shmem_emulation)264 void ProducerIPCClientImpl::OnConnectionInitialized(
265     bool connection_succeeded,
266     bool using_shmem_provided_by_producer,
267     bool direct_smb_patching_supported,
268     bool use_shmem_emulation) {
269   PERFETTO_DCHECK_THREAD(thread_checker_);
270   // If connection_succeeded == false, the OnDisconnect() call will follow next
271   // and there we'll notify the |producer_|. TODO: add a test for this.
272   if (!connection_succeeded)
273     return;
274   is_shmem_provided_by_producer_ = using_shmem_provided_by_producer;
275   direct_smb_patching_supported_ = direct_smb_patching_supported;
276   // The tracing service may reject using shared memory and tell the client to
277   // commit data over the socket. This can happen when the client connects to
278   // the service via a relay service:
279   // client <-Unix socket-> relay service <- vsock -> tracing service.
280   use_shmem_emulation_ = use_shmem_emulation;
281   producer_->OnConnect();
282 
283   // Bail out if the service failed to adopt our producer-allocated SMB.
284   // TODO(eseckler): Handle adoption failure more gracefully.
285   if (shared_memory_ && !is_shmem_provided_by_producer_) {
286     PERFETTO_DLOG("Service failed adopt producer-provided SMB, disconnecting.");
287     Disconnect();
288     return;
289   }
290 }
291 
OnServiceRequest(const protos::gen::GetAsyncCommandResponse & cmd)292 void ProducerIPCClientImpl::OnServiceRequest(
293     const protos::gen::GetAsyncCommandResponse& cmd) {
294   PERFETTO_DCHECK_THREAD(thread_checker_);
295 
296   // This message is sent only when connecting to a service running Android Q+.
297   // See comment below in kStartDataSource.
298   if (cmd.has_setup_data_source()) {
299     const auto& req = cmd.setup_data_source();
300     const DataSourceInstanceID dsid = req.new_instance_id();
301     data_sources_setup_.insert(dsid);
302     producer_->SetupDataSource(dsid, req.config());
303     return;
304   }
305 
306   if (cmd.has_start_data_source()) {
307     const auto& req = cmd.start_data_source();
308     const DataSourceInstanceID dsid = req.new_instance_id();
309     const DataSourceConfig& cfg = req.config();
310     if (!data_sources_setup_.count(dsid)) {
311       // When connecting with an older (Android P) service, the service will not
312       // send a SetupDataSource message. We synthesize it here in that case.
313       producer_->SetupDataSource(dsid, cfg);
314     }
315     producer_->StartDataSource(dsid, cfg);
316     return;
317   }
318 
319   if (cmd.has_stop_data_source()) {
320     const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id();
321     producer_->StopDataSource(dsid);
322     data_sources_setup_.erase(dsid);
323     return;
324   }
325 
326   if (cmd.has_setup_tracing()) {
327     std::unique_ptr<SharedMemory> ipc_shared_memory;
328 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
329     const std::string& shm_key = cmd.setup_tracing().shm_key_windows();
330     if (!shm_key.empty())
331       ipc_shared_memory = SharedMemoryWindows::Attach(shm_key);
332 #elif PERFETTO_BUILDFLAG(PERFETTO_OS_FUCHSIA)
333     // On Fuchsia, the embedder is responsible for routing the shared memory
334     // FD, which is provided to this code via a blocking callback.
335     PERFETTO_CHECK(receive_shmem_fd_cb_fuchsia_);
336 
337     base::ScopedFile shmem_fd(receive_shmem_fd_cb_fuchsia_());
338     if (!shmem_fd) {
339       // Failure to get a shared memory buffer is a protocol violation and
340       // therefore we should drop the Protocol connection.
341       PERFETTO_ELOG("Could not get shared memory FD from embedder.");
342       ScheduleDisconnect();
343       return;
344     }
345 
346     ipc_shared_memory =
347         PosixSharedMemory::AttachToFd(std::move(shmem_fd),
348                                       /*require_seals_if_supported=*/false);
349 #else
350     base::ScopedFile shmem_fd = ipc_channel_->TakeReceivedFD();
351     if (shmem_fd) {
352       // TODO(primiano): handle mmap failure in case of OOM.
353       ipc_shared_memory =
354           PosixSharedMemory::AttachToFd(std::move(shmem_fd),
355                                         /*require_seals_if_supported=*/false);
356     }
357 #endif
358     if (use_shmem_emulation_) {
359       PERFETTO_CHECK(!ipc_shared_memory);
360       // Need to create an emulated shmem buffer when the transport deosn't
361       // support it.
362       // TODO(chinglinyu): Let the tracing service decide on the shmem size and
363       // propagate the size in InitializeConnectionResponse.
364       ipc_shared_memory = InProcessSharedMemory::Create(
365           /*size=*/InProcessSharedMemory::kDefaultSize);
366     }
367     if (ipc_shared_memory) {
368       auto shmem_mode = use_shmem_emulation_
369                             ? SharedMemoryABI::ShmemMode::kShmemEmulation
370                             : SharedMemoryABI::ShmemMode::kDefault;
371       // This is the nominal case used in most configurations, where the service
372       // provides the SMB.
373       PERFETTO_CHECK(!is_shmem_provided_by_producer_ && !shared_memory_);
374       shared_memory_ = std::move(ipc_shared_memory);
375       shared_buffer_page_size_kb_ =
376           cmd.setup_tracing().shared_buffer_page_size_kb();
377       shared_memory_arbiter_ = SharedMemoryArbiter::CreateInstance(
378           shared_memory_.get(), shared_buffer_page_size_kb_ * 1024, shmem_mode,
379           this, task_runner_);
380       if (direct_smb_patching_supported_)
381         shared_memory_arbiter_->SetDirectSMBPatchingSupportedByService();
382     } else {
383       // Producer-provided SMB (used by Chrome for startup tracing).
384       PERFETTO_CHECK(is_shmem_provided_by_producer_ && shared_memory_ &&
385                      shared_memory_arbiter_);
386     }
387     producer_->OnTracingSetup();
388     return;
389   }
390 
391   if (cmd.has_flush()) {
392     // This cast boilerplate is required only because protobuf uses its own
393     // uint64 and not stdint's uint64_t. On some 64 bit archs they differ on the
394     // type (long vs long long) even though they have the same size.
395     const auto* data_source_ids = cmd.flush().data_source_ids().data();
396     static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
397                   "data_source_ids should be 64-bit");
398 
399     FlushFlags flags(cmd.flush().flags());
400     producer_->Flush(
401         cmd.flush().request_id(),
402         reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
403         static_cast<size_t>(cmd.flush().data_source_ids().size()), flags);
404     return;
405   }
406 
407   if (cmd.has_clear_incremental_state()) {
408     const auto* data_source_ids =
409         cmd.clear_incremental_state().data_source_ids().data();
410     static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
411                   "data_source_ids should be 64-bit");
412     producer_->ClearIncrementalState(
413         reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
414         static_cast<size_t>(
415             cmd.clear_incremental_state().data_source_ids().size()));
416     return;
417   }
418 
419   PERFETTO_DFATAL("Unknown async request received from tracing service");
420 }
421 
RegisterDataSource(const DataSourceDescriptor & descriptor)422 void ProducerIPCClientImpl::RegisterDataSource(
423     const DataSourceDescriptor& descriptor) {
424   PERFETTO_DCHECK_THREAD(thread_checker_);
425   if (!connected_) {
426     PERFETTO_DLOG(
427         "Cannot RegisterDataSource(), not connected to tracing service");
428   }
429   protos::gen::RegisterDataSourceRequest req;
430   *req.mutable_data_source_descriptor() = descriptor;
431   ipc::Deferred<protos::gen::RegisterDataSourceResponse> async_response;
432   async_response.Bind(
433       [](ipc::AsyncResult<protos::gen::RegisterDataSourceResponse> response) {
434         if (!response)
435           PERFETTO_DLOG("RegisterDataSource() failed: connection reset");
436       });
437   producer_port_->RegisterDataSource(req, std::move(async_response));
438 }
439 
UpdateDataSource(const DataSourceDescriptor & descriptor)440 void ProducerIPCClientImpl::UpdateDataSource(
441     const DataSourceDescriptor& descriptor) {
442   PERFETTO_DCHECK_THREAD(thread_checker_);
443   if (!connected_) {
444     PERFETTO_DLOG(
445         "Cannot UpdateDataSource(), not connected to tracing service");
446   }
447   protos::gen::UpdateDataSourceRequest req;
448   *req.mutable_data_source_descriptor() = descriptor;
449   ipc::Deferred<protos::gen::UpdateDataSourceResponse> async_response;
450   async_response.Bind(
451       [](ipc::AsyncResult<protos::gen::UpdateDataSourceResponse> response) {
452         if (!response)
453           PERFETTO_DLOG("UpdateDataSource() failed: connection reset");
454       });
455   producer_port_->UpdateDataSource(req, std::move(async_response));
456 }
457 
UnregisterDataSource(const std::string & name)458 void ProducerIPCClientImpl::UnregisterDataSource(const std::string& name) {
459   PERFETTO_DCHECK_THREAD(thread_checker_);
460   if (!connected_) {
461     PERFETTO_DLOG(
462         "Cannot UnregisterDataSource(), not connected to tracing service");
463     return;
464   }
465   protos::gen::UnregisterDataSourceRequest req;
466   req.set_data_source_name(name);
467   producer_port_->UnregisterDataSource(
468       req, ipc::Deferred<protos::gen::UnregisterDataSourceResponse>());
469 }
470 
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)471 void ProducerIPCClientImpl::RegisterTraceWriter(uint32_t writer_id,
472                                                 uint32_t target_buffer) {
473   PERFETTO_DCHECK_THREAD(thread_checker_);
474   if (!connected_) {
475     PERFETTO_DLOG(
476         "Cannot RegisterTraceWriter(), not connected to tracing service");
477     return;
478   }
479   protos::gen::RegisterTraceWriterRequest req;
480   req.set_trace_writer_id(writer_id);
481   req.set_target_buffer(target_buffer);
482   producer_port_->RegisterTraceWriter(
483       req, ipc::Deferred<protos::gen::RegisterTraceWriterResponse>());
484 }
485 
UnregisterTraceWriter(uint32_t writer_id)486 void ProducerIPCClientImpl::UnregisterTraceWriter(uint32_t writer_id) {
487   PERFETTO_DCHECK_THREAD(thread_checker_);
488   if (!connected_) {
489     PERFETTO_DLOG(
490         "Cannot UnregisterTraceWriter(), not connected to tracing service");
491     return;
492   }
493   protos::gen::UnregisterTraceWriterRequest req;
494   req.set_trace_writer_id(writer_id);
495   producer_port_->UnregisterTraceWriter(
496       req, ipc::Deferred<protos::gen::UnregisterTraceWriterResponse>());
497 }
498 
CommitData(const CommitDataRequest & req,CommitDataCallback callback)499 void ProducerIPCClientImpl::CommitData(const CommitDataRequest& req,
500                                        CommitDataCallback callback) {
501   PERFETTO_DCHECK_THREAD(thread_checker_);
502   if (!connected_) {
503     PERFETTO_DLOG("Cannot CommitData(), not connected to tracing service");
504     return;
505   }
506   ipc::Deferred<protos::gen::CommitDataResponse> async_response;
507   // TODO(primiano): add a test that destroys ProducerIPCClientImpl soon after
508   // this call and checks that the callback is dropped.
509   if (callback) {
510     async_response.Bind(
511         [callback](ipc::AsyncResult<protos::gen::CommitDataResponse> response) {
512           if (!response) {
513             PERFETTO_DLOG("CommitData() failed: connection reset");
514             return;
515           }
516           callback();
517         });
518   }
519   producer_port_->CommitData(req, std::move(async_response));
520 }
521 
NotifyDataSourceStarted(DataSourceInstanceID id)522 void ProducerIPCClientImpl::NotifyDataSourceStarted(DataSourceInstanceID id) {
523   PERFETTO_DCHECK_THREAD(thread_checker_);
524   if (!connected_) {
525     PERFETTO_DLOG(
526         "Cannot NotifyDataSourceStarted(), not connected to tracing service");
527     return;
528   }
529   protos::gen::NotifyDataSourceStartedRequest req;
530   req.set_data_source_id(id);
531   producer_port_->NotifyDataSourceStarted(
532       req, ipc::Deferred<protos::gen::NotifyDataSourceStartedResponse>());
533 }
534 
NotifyDataSourceStopped(DataSourceInstanceID id)535 void ProducerIPCClientImpl::NotifyDataSourceStopped(DataSourceInstanceID id) {
536   PERFETTO_DCHECK_THREAD(thread_checker_);
537   if (!connected_) {
538     PERFETTO_DLOG(
539         "Cannot NotifyDataSourceStopped(), not connected to tracing service");
540     return;
541   }
542   protos::gen::NotifyDataSourceStoppedRequest req;
543   req.set_data_source_id(id);
544   producer_port_->NotifyDataSourceStopped(
545       req, ipc::Deferred<protos::gen::NotifyDataSourceStoppedResponse>());
546 }
547 
ActivateTriggers(const std::vector<std::string> & triggers)548 void ProducerIPCClientImpl::ActivateTriggers(
549     const std::vector<std::string>& triggers) {
550   PERFETTO_DCHECK_THREAD(thread_checker_);
551   if (!connected_) {
552     PERFETTO_DLOG(
553         "Cannot ActivateTriggers(), not connected to tracing service");
554     return;
555   }
556   protos::gen::ActivateTriggersRequest proto_req;
557   for (const auto& name : triggers) {
558     *proto_req.add_trigger_names() = name;
559   }
560   producer_port_->ActivateTriggers(
561       proto_req, ipc::Deferred<protos::gen::ActivateTriggersResponse>());
562 }
563 
Sync(std::function<void ()> callback)564 void ProducerIPCClientImpl::Sync(std::function<void()> callback) {
565   PERFETTO_DCHECK_THREAD(thread_checker_);
566   if (!connected_) {
567     pending_sync_reqs_.emplace_back(std::move(callback));
568     return;
569   }
570   ipc::Deferred<protos::gen::SyncResponse> resp;
571   resp.Bind([callback](ipc::AsyncResult<protos::gen::SyncResponse>) {
572     // Here we ACK the callback even if the service replies with a failure
573     // (i.e. the service is too old and doesn't understand Sync()). In that
574     // case the service has still seen the request, the IPC roundtrip is
575     // still a (weaker) linearization fence.
576     callback();
577   });
578   producer_port_->Sync(protos::gen::SyncRequest(), std::move(resp));
579 }
580 
CreateTraceWriter(BufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)581 std::unique_ptr<TraceWriter> ProducerIPCClientImpl::CreateTraceWriter(
582     BufferID target_buffer,
583     BufferExhaustedPolicy buffer_exhausted_policy) {
584   // This method can be called by different threads. |shared_memory_arbiter_| is
585   // thread-safe but be aware of accessing any other state in this function.
586   return shared_memory_arbiter_->CreateTraceWriter(target_buffer,
587                                                    buffer_exhausted_policy);
588 }
589 
MaybeSharedMemoryArbiter()590 SharedMemoryArbiter* ProducerIPCClientImpl::MaybeSharedMemoryArbiter() {
591   return shared_memory_arbiter_.get();
592 }
593 
IsShmemProvidedByProducer() const594 bool ProducerIPCClientImpl::IsShmemProvidedByProducer() const {
595   return is_shmem_provided_by_producer_;
596 }
597 
NotifyFlushComplete(FlushRequestID req_id)598 void ProducerIPCClientImpl::NotifyFlushComplete(FlushRequestID req_id) {
599   return shared_memory_arbiter_->NotifyFlushComplete(req_id);
600 }
601 
shared_memory() const602 SharedMemory* ProducerIPCClientImpl::shared_memory() const {
603   return shared_memory_.get();
604 }
605 
shared_buffer_page_size_kb() const606 size_t ProducerIPCClientImpl::shared_buffer_page_size_kb() const {
607   return shared_buffer_page_size_kb_;
608 }
609 
610 }  // namespace perfetto
611