xref: /aosp_15_r20/external/perfetto/src/tracing/ipc/service/producer_ipc_service.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/service/producer_ipc_service.h"
18 
19 #include <cinttypes>
20 
21 #include "perfetto/base/logging.h"
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ext/ipc/host.h"
24 #include "perfetto/ext/ipc/service.h"
25 #include "perfetto/ext/tracing/core/client_identity.h"
26 #include "perfetto/ext/tracing/core/commit_data_request.h"
27 #include "perfetto/ext/tracing/core/tracing_service.h"
28 #include "perfetto/tracing/core/data_source_config.h"
29 #include "perfetto/tracing/core/data_source_descriptor.h"
30 
31 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
32 #include "src/tracing/ipc/shared_memory_windows.h"
33 #else
34 #include "src/tracing/ipc/posix_shared_memory.h"
35 #endif
36 
37 // The remote Producer(s) are not trusted. All the methods from the ProducerPort
38 // IPC layer (e.g. RegisterDataSource()) must assume that the remote Producer is
39 // compromised.
40 
41 namespace perfetto {
42 
ProducerIPCService(TracingService * core_service)43 ProducerIPCService::ProducerIPCService(TracingService* core_service)
44     : core_service_(core_service), weak_ptr_factory_(this) {}
45 
46 ProducerIPCService::~ProducerIPCService() = default;
47 
48 ProducerIPCService::RemoteProducer*
GetProducerForCurrentRequest()49 ProducerIPCService::GetProducerForCurrentRequest() {
50   const ipc::ClientID ipc_client_id = ipc::Service::client_info().client_id();
51   PERFETTO_CHECK(ipc_client_id);
52   auto it = producers_.find(ipc_client_id);
53   if (it == producers_.end())
54     return nullptr;
55   return it->second.get();
56 }
57 
58 // Called by the remote Producer through the IPC channel soon after connecting.
InitializeConnection(const protos::gen::InitializeConnectionRequest & req,DeferredInitializeConnectionResponse response)59 void ProducerIPCService::InitializeConnection(
60     const protos::gen::InitializeConnectionRequest& req,
61     DeferredInitializeConnectionResponse response) {
62   const auto& client_info = ipc::Service::client_info();
63   const ipc::ClientID ipc_client_id = client_info.client_id();
64   PERFETTO_CHECK(ipc_client_id);
65 
66   if (producers_.count(ipc_client_id) > 0) {
67     PERFETTO_DLOG(
68         "The remote Producer is trying to re-initialize the connection");
69     return response.Reject();
70   }
71 
72   // Create a new entry.
73   std::unique_ptr<RemoteProducer> producer(new RemoteProducer());
74 
75   TracingService::ProducerSMBScrapingMode smb_scraping_mode =
76       TracingService::ProducerSMBScrapingMode::kDefault;
77   switch (req.smb_scraping_mode()) {
78     case protos::gen::InitializeConnectionRequest::SMB_SCRAPING_UNSPECIFIED:
79       break;
80     case protos::gen::InitializeConnectionRequest::SMB_SCRAPING_DISABLED:
81       smb_scraping_mode = TracingService::ProducerSMBScrapingMode::kDisabled;
82       break;
83     case protos::gen::InitializeConnectionRequest::SMB_SCRAPING_ENABLED:
84       smb_scraping_mode = TracingService::ProducerSMBScrapingMode::kEnabled;
85       break;
86   }
87 
88   // If the producer provided an SMB, tell the service to attempt to adopt it.
89   std::unique_ptr<SharedMemory> shmem;
90   if (req.producer_provided_shmem()) {
91 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
92     if (!req.has_shm_key_windows() || req.shm_key_windows().empty()) {
93       PERFETTO_ELOG(
94           "shm_key_windows must be non-empty when "
95           "producer_provided_shmem = true");
96     } else {
97       shmem = SharedMemoryWindows::Attach(req.shm_key_windows());
98       // Attach() does error logging if something fails, no need to extra ELOGs.
99     }
100 #else
101     base::ScopedFile shmem_fd = ipc::Service::TakeReceivedFD();
102 
103     if (shmem_fd) {
104       shmem = PosixSharedMemory::AttachToFd(
105           std::move(shmem_fd), /*require_seals_if_supported=*/true);
106       if (!shmem) {
107         PERFETTO_ELOG(
108             "Couldn't map producer-provided SMB, falling back to "
109             "service-provided SMB");
110       }
111     } else {
112       PERFETTO_DLOG(
113           "InitializeConnectionRequest's producer_provided_shmem flag is set "
114           "but the producer didn't provide an FD");
115     }
116 #endif
117   }
118 
119   // Copy the data fields to be emitted to trace packets into ClientIdentity.
120   ClientIdentity client_identity(client_info.uid(), client_info.pid(),
121                                  client_info.machine_id());
122   // ConnectProducer will call OnConnect() on the next task.
123   producer->service_endpoint = core_service_->ConnectProducer(
124       producer.get(), client_identity, req.producer_name(),
125       req.shared_memory_size_hint_bytes(),
126       /*in_process=*/false, smb_scraping_mode,
127       req.shared_memory_page_size_hint_bytes(), std::move(shmem),
128       req.sdk_version());
129 
130   // Could happen if the service has too many producers connected.
131   if (!producer->service_endpoint) {
132     response.Reject();
133     return;
134   }
135 
136   bool use_shmem_emulation = ipc::Service::use_shmem_emulation();
137   bool using_producer_shmem =
138       !use_shmem_emulation &&
139       producer->service_endpoint->IsShmemProvidedByProducer();
140 
141   producers_.emplace(ipc_client_id, std::move(producer));
142   // Because of the std::move() |producer| is invalid after this point.
143 
144   auto async_res =
145       ipc::AsyncResult<protos::gen::InitializeConnectionResponse>::Create();
146   async_res->set_using_shmem_provided_by_producer(using_producer_shmem);
147   async_res->set_direct_smb_patching_supported(true);
148   async_res->set_use_shmem_emulation(use_shmem_emulation);
149   response.Resolve(std::move(async_res));
150 }
151 
152 // Called by the remote Producer through the IPC channel.
RegisterDataSource(const protos::gen::RegisterDataSourceRequest & req,DeferredRegisterDataSourceResponse response)153 void ProducerIPCService::RegisterDataSource(
154     const protos::gen::RegisterDataSourceRequest& req,
155     DeferredRegisterDataSourceResponse response) {
156   RemoteProducer* producer = GetProducerForCurrentRequest();
157   if (!producer) {
158     PERFETTO_DLOG(
159         "Producer invoked RegisterDataSource() before InitializeConnection()");
160     if (response.IsBound())
161       response.Reject();
162     return;
163   }
164 
165   const DataSourceDescriptor& dsd = req.data_source_descriptor();
166   GetProducerForCurrentRequest()->service_endpoint->RegisterDataSource(dsd);
167 
168   // RegisterDataSource doesn't expect any meaningful response.
169   if (response.IsBound()) {
170     response.Resolve(
171         ipc::AsyncResult<protos::gen::RegisterDataSourceResponse>::Create());
172   }
173 }
174 
175 // Called by the remote Producer through the IPC channel.
UpdateDataSource(const protos::gen::UpdateDataSourceRequest & req,DeferredUpdateDataSourceResponse response)176 void ProducerIPCService::UpdateDataSource(
177     const protos::gen::UpdateDataSourceRequest& req,
178     DeferredUpdateDataSourceResponse response) {
179   RemoteProducer* producer = GetProducerForCurrentRequest();
180   if (!producer) {
181     PERFETTO_DLOG(
182         "Producer invoked UpdateDataSource() before InitializeConnection()");
183     if (response.IsBound())
184       response.Reject();
185     return;
186   }
187 
188   const DataSourceDescriptor& dsd = req.data_source_descriptor();
189   GetProducerForCurrentRequest()->service_endpoint->UpdateDataSource(dsd);
190 
191   // UpdateDataSource doesn't expect any meaningful response.
192   if (response.IsBound()) {
193     response.Resolve(
194         ipc::AsyncResult<protos::gen::UpdateDataSourceResponse>::Create());
195   }
196 }
197 
198 // Called by the IPC layer.
OnClientDisconnected()199 void ProducerIPCService::OnClientDisconnected() {
200   ipc::ClientID client_id = ipc::Service::client_info().client_id();
201   PERFETTO_DLOG("Client %" PRIu64 " disconnected", client_id);
202   producers_.erase(client_id);
203 }
204 
205 // TODO(fmayer): test what happens if we receive the following tasks, in order:
206 // RegisterDataSource, UnregisterDataSource, OnDataSourceRegistered.
207 // which essentially means that the client posted back to back a
208 // ReqisterDataSource and UnregisterDataSource speculating on the next id.
209 // Called by the remote Service through the IPC channel.
UnregisterDataSource(const protos::gen::UnregisterDataSourceRequest & req,DeferredUnregisterDataSourceResponse response)210 void ProducerIPCService::UnregisterDataSource(
211     const protos::gen::UnregisterDataSourceRequest& req,
212     DeferredUnregisterDataSourceResponse response) {
213   RemoteProducer* producer = GetProducerForCurrentRequest();
214   if (!producer) {
215     PERFETTO_DLOG(
216         "Producer invoked UnregisterDataSource() before "
217         "InitializeConnection()");
218     if (response.IsBound())
219       response.Reject();
220     return;
221   }
222   producer->service_endpoint->UnregisterDataSource(req.data_source_name());
223 
224   // UnregisterDataSource doesn't expect any meaningful response.
225   if (response.IsBound()) {
226     response.Resolve(
227         ipc::AsyncResult<protos::gen::UnregisterDataSourceResponse>::Create());
228   }
229 }
230 
RegisterTraceWriter(const protos::gen::RegisterTraceWriterRequest & req,DeferredRegisterTraceWriterResponse response)231 void ProducerIPCService::RegisterTraceWriter(
232     const protos::gen::RegisterTraceWriterRequest& req,
233     DeferredRegisterTraceWriterResponse response) {
234   RemoteProducer* producer = GetProducerForCurrentRequest();
235   if (!producer) {
236     PERFETTO_DLOG(
237         "Producer invoked RegisterTraceWriter() before "
238         "InitializeConnection()");
239     if (response.IsBound())
240       response.Reject();
241     return;
242   }
243   producer->service_endpoint->RegisterTraceWriter(req.trace_writer_id(),
244                                                   req.target_buffer());
245 
246   // RegisterTraceWriter doesn't expect any meaningful response.
247   if (response.IsBound()) {
248     response.Resolve(
249         ipc::AsyncResult<protos::gen::RegisterTraceWriterResponse>::Create());
250   }
251 }
252 
UnregisterTraceWriter(const protos::gen::UnregisterTraceWriterRequest & req,DeferredUnregisterTraceWriterResponse response)253 void ProducerIPCService::UnregisterTraceWriter(
254     const protos::gen::UnregisterTraceWriterRequest& req,
255     DeferredUnregisterTraceWriterResponse response) {
256   RemoteProducer* producer = GetProducerForCurrentRequest();
257   if (!producer) {
258     PERFETTO_DLOG(
259         "Producer invoked UnregisterTraceWriter() before "
260         "InitializeConnection()");
261     if (response.IsBound())
262       response.Reject();
263     return;
264   }
265   producer->service_endpoint->UnregisterTraceWriter(req.trace_writer_id());
266 
267   // UnregisterTraceWriter doesn't expect any meaningful response.
268   if (response.IsBound()) {
269     response.Resolve(
270         ipc::AsyncResult<protos::gen::UnregisterTraceWriterResponse>::Create());
271   }
272 }
273 
CommitData(const protos::gen::CommitDataRequest & req,DeferredCommitDataResponse resp)274 void ProducerIPCService::CommitData(const protos::gen::CommitDataRequest& req,
275                                     DeferredCommitDataResponse resp) {
276   RemoteProducer* producer = GetProducerForCurrentRequest();
277   if (!producer) {
278     PERFETTO_DLOG(
279         "Producer invoked CommitData() before InitializeConnection()");
280     if (resp.IsBound())
281       resp.Reject();
282     return;
283   }
284 
285   // We don't want to send a response if the client didn't attach a callback to
286   // the original request. Doing so would generate unnecessary wakeups and
287   // context switches.
288   std::function<void()> callback;
289   if (resp.IsBound()) {
290     // Capturing |resp| by reference here speculates on the fact that
291     // CommitData() in tracing_service_impl.cc invokes the passed callback
292     // inline, without posting it. If that assumption changes this code needs to
293     // wrap the response in a shared_ptr (C+11 lambdas don't support move) and
294     // use a weak ptr in the caller.
295     callback = [&resp] {
296       resp.Resolve(ipc::AsyncResult<protos::gen::CommitDataResponse>::Create());
297     };
298   }
299   producer->service_endpoint->CommitData(req, callback);
300 }
301 
NotifyDataSourceStarted(const protos::gen::NotifyDataSourceStartedRequest & request,DeferredNotifyDataSourceStartedResponse response)302 void ProducerIPCService::NotifyDataSourceStarted(
303     const protos::gen::NotifyDataSourceStartedRequest& request,
304     DeferredNotifyDataSourceStartedResponse response) {
305   RemoteProducer* producer = GetProducerForCurrentRequest();
306   if (!producer) {
307     PERFETTO_DLOG(
308         "Producer invoked NotifyDataSourceStarted() before "
309         "InitializeConnection()");
310     if (response.IsBound())
311       response.Reject();
312     return;
313   }
314   producer->service_endpoint->NotifyDataSourceStarted(request.data_source_id());
315 
316   // NotifyDataSourceStopped shouldn't expect any meaningful response, avoid
317   // a useless IPC in that case.
318   if (response.IsBound()) {
319     response.Resolve(ipc::AsyncResult<
320                      protos::gen::NotifyDataSourceStartedResponse>::Create());
321   }
322 }
323 
NotifyDataSourceStopped(const protos::gen::NotifyDataSourceStoppedRequest & request,DeferredNotifyDataSourceStoppedResponse response)324 void ProducerIPCService::NotifyDataSourceStopped(
325     const protos::gen::NotifyDataSourceStoppedRequest& request,
326     DeferredNotifyDataSourceStoppedResponse response) {
327   RemoteProducer* producer = GetProducerForCurrentRequest();
328   if (!producer) {
329     PERFETTO_DLOG(
330         "Producer invoked NotifyDataSourceStopped() before "
331         "InitializeConnection()");
332     if (response.IsBound())
333       response.Reject();
334     return;
335   }
336   producer->service_endpoint->NotifyDataSourceStopped(request.data_source_id());
337 
338   // NotifyDataSourceStopped shouldn't expect any meaningful response, avoid
339   // a useless IPC in that case.
340   if (response.IsBound()) {
341     response.Resolve(ipc::AsyncResult<
342                      protos::gen::NotifyDataSourceStoppedResponse>::Create());
343   }
344 }
345 
ActivateTriggers(const protos::gen::ActivateTriggersRequest & proto_req,DeferredActivateTriggersResponse resp)346 void ProducerIPCService::ActivateTriggers(
347     const protos::gen::ActivateTriggersRequest& proto_req,
348     DeferredActivateTriggersResponse resp) {
349   RemoteProducer* producer = GetProducerForCurrentRequest();
350   if (!producer) {
351     PERFETTO_DLOG(
352         "Producer invoked ActivateTriggers() before InitializeConnection()");
353     if (resp.IsBound())
354       resp.Reject();
355     return;
356   }
357   std::vector<std::string> triggers;
358   for (const auto& name : proto_req.trigger_names()) {
359     triggers.push_back(name);
360   }
361   producer->service_endpoint->ActivateTriggers(triggers);
362   // ActivateTriggers shouldn't expect any meaningful response, avoid
363   // a useless IPC in that case.
364   if (resp.IsBound()) {
365     resp.Resolve(
366         ipc::AsyncResult<protos::gen::ActivateTriggersResponse>::Create());
367   }
368 }
369 
GetAsyncCommand(const protos::gen::GetAsyncCommandRequest &,DeferredGetAsyncCommandResponse response)370 void ProducerIPCService::GetAsyncCommand(
371     const protos::gen::GetAsyncCommandRequest&,
372     DeferredGetAsyncCommandResponse response) {
373   RemoteProducer* producer = GetProducerForCurrentRequest();
374   if (!producer) {
375     PERFETTO_DLOG(
376         "Producer invoked GetAsyncCommand() before "
377         "InitializeConnection()");
378     return response.Reject();
379   }
380   // Keep the back channel open, without ever resolving the ipc::Deferred fully,
381   // to send async commands to the RemoteProducer (e.g., starting/stopping a
382   // data source).
383   producer->async_producer_commands = std::move(response);
384 
385   // Service may already have issued the OnTracingSetup() event, in which case
386   // we should forward it to the producer now.
387   if (producer->send_setup_tracing_on_async_commands_bound)
388     producer->SendSetupTracing();
389 }
390 
Sync(const protos::gen::SyncRequest &,DeferredSyncResponse resp)391 void ProducerIPCService::Sync(const protos::gen::SyncRequest&,
392                               DeferredSyncResponse resp) {
393   RemoteProducer* producer = GetProducerForCurrentRequest();
394   if (!producer) {
395     PERFETTO_DLOG("Producer invoked Sync() before InitializeConnection()");
396     return resp.Reject();
397   }
398   auto weak_this = weak_ptr_factory_.GetWeakPtr();
399   auto resp_it = pending_syncs_.insert(pending_syncs_.end(), std::move(resp));
400   auto callback = [weak_this, resp_it]() {
401     if (!weak_this)
402       return;
403     auto pending_resp = std::move(*resp_it);
404     weak_this->pending_syncs_.erase(resp_it);
405     pending_resp.Resolve(ipc::AsyncResult<protos::gen::SyncResponse>::Create());
406   };
407   producer->service_endpoint->Sync(callback);
408 }
409 
410 ////////////////////////////////////////////////////////////////////////////////
411 // RemoteProducer methods
412 ////////////////////////////////////////////////////////////////////////////////
413 
414 ProducerIPCService::RemoteProducer::RemoteProducer() = default;
415 ProducerIPCService::RemoteProducer::~RemoteProducer() = default;
416 
417 // Invoked by the |core_service_| business logic after the ConnectProducer()
418 // call. There is nothing to do here, we really expected the ConnectProducer()
419 // to just work in the local case.
OnConnect()420 void ProducerIPCService::RemoteProducer::OnConnect() {}
421 
422 // Invoked by the |core_service_| business logic after we destroy the
423 // |service_endpoint| (in the RemoteProducer dtor).
OnDisconnect()424 void ProducerIPCService::RemoteProducer::OnDisconnect() {}
425 
426 // Invoked by the |core_service_| business logic when it wants to create a new
427 // data source.
SetupDataSource(DataSourceInstanceID dsid,const DataSourceConfig & cfg)428 void ProducerIPCService::RemoteProducer::SetupDataSource(
429     DataSourceInstanceID dsid,
430     const DataSourceConfig& cfg) {
431   if (!async_producer_commands.IsBound()) {
432     PERFETTO_DLOG(
433         "The Service tried to create a new data source but the remote Producer "
434         "has not yet initialized the connection");
435     return;
436   }
437   auto cmd = ipc::AsyncResult<protos::gen::GetAsyncCommandResponse>::Create();
438   cmd.set_has_more(true);
439   cmd->mutable_setup_data_source()->set_new_instance_id(dsid);
440   *cmd->mutable_setup_data_source()->mutable_config() = cfg;
441   async_producer_commands.Resolve(std::move(cmd));
442 }
443 
444 // Invoked by the |core_service_| business logic when it wants to start a new
445 // data source.
StartDataSource(DataSourceInstanceID dsid,const DataSourceConfig & cfg)446 void ProducerIPCService::RemoteProducer::StartDataSource(
447     DataSourceInstanceID dsid,
448     const DataSourceConfig& cfg) {
449   if (!async_producer_commands.IsBound()) {
450     PERFETTO_DLOG(
451         "The Service tried to start a new data source but the remote Producer "
452         "has not yet initialized the connection");
453     return;
454   }
455   auto cmd = ipc::AsyncResult<protos::gen::GetAsyncCommandResponse>::Create();
456   cmd.set_has_more(true);
457   cmd->mutable_start_data_source()->set_new_instance_id(dsid);
458   *cmd->mutable_start_data_source()->mutable_config() = cfg;
459   async_producer_commands.Resolve(std::move(cmd));
460 }
461 
StopDataSource(DataSourceInstanceID dsid)462 void ProducerIPCService::RemoteProducer::StopDataSource(
463     DataSourceInstanceID dsid) {
464   if (!async_producer_commands.IsBound()) {
465     PERFETTO_DLOG(
466         "The Service tried to stop a data source but the remote Producer "
467         "has not yet initialized the connection");
468     return;
469   }
470   auto cmd = ipc::AsyncResult<protos::gen::GetAsyncCommandResponse>::Create();
471   cmd.set_has_more(true);
472   cmd->mutable_stop_data_source()->set_instance_id(dsid);
473   async_producer_commands.Resolve(std::move(cmd));
474 }
475 
OnTracingSetup()476 void ProducerIPCService::RemoteProducer::OnTracingSetup() {
477   if (!async_producer_commands.IsBound()) {
478     // Service may call this before the producer issued GetAsyncCommand.
479     send_setup_tracing_on_async_commands_bound = true;
480     return;
481   }
482   SendSetupTracing();
483 }
484 
SendSetupTracing()485 void ProducerIPCService::RemoteProducer::SendSetupTracing() {
486   PERFETTO_CHECK(async_producer_commands.IsBound());
487   PERFETTO_CHECK(service_endpoint->shared_memory());
488   auto cmd = ipc::AsyncResult<protos::gen::GetAsyncCommandResponse>::Create();
489   cmd.set_has_more(true);
490   auto setup_tracing = cmd->mutable_setup_tracing();
491   if (!service_endpoint->IsShmemProvidedByProducer()) {
492     // Nominal case (% Chrome): service provides SMB.
493     setup_tracing->set_shared_buffer_page_size_kb(
494         static_cast<uint32_t>(service_endpoint->shared_buffer_page_size_kb()));
495 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
496     const std::string& shm_key =
497         static_cast<SharedMemoryWindows*>(service_endpoint->shared_memory())
498             ->key();
499     setup_tracing->set_shm_key_windows(shm_key);
500 #else
501     const int shm_fd =
502         static_cast<PosixSharedMemory*>(service_endpoint->shared_memory())
503             ->fd();
504     cmd.set_fd(shm_fd);
505 #endif
506   }
507   async_producer_commands.Resolve(std::move(cmd));
508 }
509 
Flush(FlushRequestID flush_request_id,const DataSourceInstanceID * data_source_ids,size_t num_data_sources,FlushFlags flush_flags)510 void ProducerIPCService::RemoteProducer::Flush(
511     FlushRequestID flush_request_id,
512     const DataSourceInstanceID* data_source_ids,
513     size_t num_data_sources,
514     FlushFlags flush_flags) {
515   if (!async_producer_commands.IsBound()) {
516     PERFETTO_DLOG(
517         "The Service tried to request a flush but the remote Producer has not "
518         "yet initialized the connection");
519     return;
520   }
521   auto cmd = ipc::AsyncResult<protos::gen::GetAsyncCommandResponse>::Create();
522   cmd.set_has_more(true);
523   for (size_t i = 0; i < num_data_sources; i++)
524     cmd->mutable_flush()->add_data_source_ids(data_source_ids[i]);
525   cmd->mutable_flush()->set_request_id(flush_request_id);
526   cmd->mutable_flush()->set_flags(flush_flags.flags());
527   async_producer_commands.Resolve(std::move(cmd));
528 }
529 
ClearIncrementalState(const DataSourceInstanceID * data_source_ids,size_t num_data_sources)530 void ProducerIPCService::RemoteProducer::ClearIncrementalState(
531     const DataSourceInstanceID* data_source_ids,
532     size_t num_data_sources) {
533   if (!async_producer_commands.IsBound()) {
534     PERFETTO_DLOG(
535         "The Service tried to request an incremental state invalidation, but "
536         "the remote Producer has not yet initialized the connection");
537     return;
538   }
539   auto cmd = ipc::AsyncResult<protos::gen::GetAsyncCommandResponse>::Create();
540   cmd.set_has_more(true);
541   for (size_t i = 0; i < num_data_sources; i++)
542     cmd->mutable_clear_incremental_state()->add_data_source_ids(
543         data_source_ids[i]);
544   async_producer_commands.Resolve(std::move(cmd));
545 }
546 
547 }  // namespace perfetto
548