xref: /aosp_15_r20/external/perfetto/src/tracing/ipc/service/consumer_ipc_service.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/service/consumer_ipc_service.h"
18 
19 #include <cinttypes>
20 
21 #include "perfetto/base/logging.h"
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ext/base/scoped_file.h"
24 #include "perfetto/ext/ipc/basic_types.h"
25 #include "perfetto/ext/ipc/host.h"
26 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
27 #include "perfetto/ext/tracing/core/slice.h"
28 #include "perfetto/ext/tracing/core/trace_packet.h"
29 #include "perfetto/ext/tracing/core/trace_stats.h"
30 #include "perfetto/ext/tracing/core/tracing_service.h"
31 #include "perfetto/tracing/core/trace_config.h"
32 #include "perfetto/tracing/core/tracing_service_capabilities.h"
33 #include "perfetto/tracing/core/tracing_service_state.h"
34 
35 namespace perfetto {
36 
ConsumerIPCService(TracingService * core_service)37 ConsumerIPCService::ConsumerIPCService(TracingService* core_service)
38     : core_service_(core_service), weak_ptr_factory_(this) {}
39 
40 ConsumerIPCService::~ConsumerIPCService() = default;
41 
42 ConsumerIPCService::RemoteConsumer*
GetConsumerForCurrentRequest()43 ConsumerIPCService::GetConsumerForCurrentRequest() {
44   const ipc::ClientID ipc_client_id = ipc::Service::client_info().client_id();
45   const uid_t uid = ipc::Service::client_info().uid();
46   PERFETTO_CHECK(ipc_client_id);
47   auto it = consumers_.find(ipc_client_id);
48   if (it == consumers_.end()) {
49     auto* remote_consumer = new RemoteConsumer();
50     consumers_[ipc_client_id].reset(remote_consumer);
51     remote_consumer->service_endpoint =
52         core_service_->ConnectConsumer(remote_consumer, uid);
53     return remote_consumer;
54   }
55   return it->second.get();
56 }
57 
58 // Called by the IPC layer.
OnClientDisconnected()59 void ConsumerIPCService::OnClientDisconnected() {
60   ipc::ClientID client_id = ipc::Service::client_info().client_id();
61   consumers_.erase(client_id);
62 }
63 
64 // Called by the IPC layer.
EnableTracing(const protos::gen::EnableTracingRequest & req,DeferredEnableTracingResponse resp)65 void ConsumerIPCService::EnableTracing(
66     const protos::gen::EnableTracingRequest& req,
67     DeferredEnableTracingResponse resp) {
68   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
69   if (req.attach_notification_only()) {
70     remote_consumer->enable_tracing_response = std::move(resp);
71     return;
72   }
73   const TraceConfig& trace_config = req.trace_config();
74   base::ScopedFile fd;
75   if (trace_config.write_into_file() && trace_config.output_path().empty())
76     fd = ipc::Service::TakeReceivedFD();
77   remote_consumer->service_endpoint->EnableTracing(trace_config, std::move(fd));
78   remote_consumer->enable_tracing_response = std::move(resp);
79 }
80 
81 // Called by the IPC layer.
StartTracing(const protos::gen::StartTracingRequest &,DeferredStartTracingResponse resp)82 void ConsumerIPCService::StartTracing(const protos::gen::StartTracingRequest&,
83                                       DeferredStartTracingResponse resp) {
84   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
85   remote_consumer->service_endpoint->StartTracing();
86   resp.Resolve(ipc::AsyncResult<protos::gen::StartTracingResponse>::Create());
87 }
88 
89 // Called by the IPC layer.
ChangeTraceConfig(const protos::gen::ChangeTraceConfigRequest & req,DeferredChangeTraceConfigResponse resp)90 void ConsumerIPCService::ChangeTraceConfig(
91     const protos::gen::ChangeTraceConfigRequest& req,
92     DeferredChangeTraceConfigResponse resp) {
93   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
94   remote_consumer->service_endpoint->ChangeTraceConfig(req.trace_config());
95   resp.Resolve(
96       ipc::AsyncResult<protos::gen::ChangeTraceConfigResponse>::Create());
97 }
98 
99 // Called by the IPC layer.
DisableTracing(const protos::gen::DisableTracingRequest &,DeferredDisableTracingResponse resp)100 void ConsumerIPCService::DisableTracing(
101     const protos::gen::DisableTracingRequest&,
102     DeferredDisableTracingResponse resp) {
103   GetConsumerForCurrentRequest()->service_endpoint->DisableTracing();
104   resp.Resolve(ipc::AsyncResult<protos::gen::DisableTracingResponse>::Create());
105 }
106 
107 // Called by the IPC layer.
ReadBuffers(const protos::gen::ReadBuffersRequest &,DeferredReadBuffersResponse resp)108 void ConsumerIPCService::ReadBuffers(const protos::gen::ReadBuffersRequest&,
109                                      DeferredReadBuffersResponse resp) {
110   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
111   remote_consumer->read_buffers_response = std::move(resp);
112   remote_consumer->service_endpoint->ReadBuffers();
113 }
114 
115 // Called by the IPC layer.
FreeBuffers(const protos::gen::FreeBuffersRequest &,DeferredFreeBuffersResponse resp)116 void ConsumerIPCService::FreeBuffers(const protos::gen::FreeBuffersRequest&,
117                                      DeferredFreeBuffersResponse resp) {
118   GetConsumerForCurrentRequest()->service_endpoint->FreeBuffers();
119   resp.Resolve(ipc::AsyncResult<protos::gen::FreeBuffersResponse>::Create());
120 }
121 
122 // Called by the IPC layer.
Flush(const protos::gen::FlushRequest & req,DeferredFlushResponse resp)123 void ConsumerIPCService::Flush(const protos::gen::FlushRequest& req,
124                                DeferredFlushResponse resp) {
125   auto it = pending_flush_responses_.insert(pending_flush_responses_.end(),
126                                             std::move(resp));
127   auto weak_this = weak_ptr_factory_.GetWeakPtr();
128   auto callback = [weak_this, it](bool success) {
129     if (weak_this)
130       weak_this->OnFlushCallback(success, std::move(it));
131   };
132   FlushFlags flags(req.flags());
133   GetConsumerForCurrentRequest()->service_endpoint->Flush(
134       req.timeout_ms(), std::move(callback), flags);
135 }
136 
137 // Called by the IPC layer.
Detach(const protos::gen::DetachRequest & req,DeferredDetachResponse resp)138 void ConsumerIPCService::Detach(const protos::gen::DetachRequest& req,
139                                 DeferredDetachResponse resp) {
140   // OnDetach() will resolve the |detach_response|.
141   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
142   remote_consumer->detach_response = std::move(resp);
143   remote_consumer->service_endpoint->Detach(req.key());
144 }
145 
146 // Called by the IPC layer.
Attach(const protos::gen::AttachRequest & req,DeferredAttachResponse resp)147 void ConsumerIPCService::Attach(const protos::gen::AttachRequest& req,
148                                 DeferredAttachResponse resp) {
149   // OnAttach() will resolve the |attach_response|.
150   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
151   remote_consumer->attach_response = std::move(resp);
152   remote_consumer->service_endpoint->Attach(req.key());
153 }
154 
155 // Called by the IPC layer.
GetTraceStats(const protos::gen::GetTraceStatsRequest &,DeferredGetTraceStatsResponse resp)156 void ConsumerIPCService::GetTraceStats(const protos::gen::GetTraceStatsRequest&,
157                                        DeferredGetTraceStatsResponse resp) {
158   // OnTraceStats() will resolve the |get_trace_stats_response|.
159   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
160   remote_consumer->get_trace_stats_response = std::move(resp);
161   remote_consumer->service_endpoint->GetTraceStats();
162 }
163 
164 // Called by the IPC layer.
ObserveEvents(const protos::gen::ObserveEventsRequest & req,DeferredObserveEventsResponse resp)165 void ConsumerIPCService::ObserveEvents(
166     const protos::gen::ObserveEventsRequest& req,
167     DeferredObserveEventsResponse resp) {
168   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
169 
170   // If there's a prior stream, close it so that client can clean it up.
171   remote_consumer->CloseObserveEventsResponseStream();
172 
173   remote_consumer->observe_events_response = std::move(resp);
174 
175   uint32_t events_mask = 0;
176   for (const auto& type : req.events_to_observe()) {
177     events_mask |= static_cast<uint32_t>(type);
178   }
179   remote_consumer->service_endpoint->ObserveEvents(events_mask);
180 
181   // If no events are to be observed, close the stream immediately so that the
182   // client can clean up.
183   if (events_mask == 0)
184     remote_consumer->CloseObserveEventsResponseStream();
185 }
186 
187 // Called by the IPC layer.
QueryServiceState(const protos::gen::QueryServiceStateRequest & req,DeferredQueryServiceStateResponse resp)188 void ConsumerIPCService::QueryServiceState(
189     const protos::gen::QueryServiceStateRequest& req,
190     DeferredQueryServiceStateResponse resp) {
191   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
192   auto it = pending_query_service_responses_.insert(
193       pending_query_service_responses_.end(), std::move(resp));
194   auto weak_this = weak_ptr_factory_.GetWeakPtr();
195   auto callback = [weak_this, it](bool success,
196                                   const TracingServiceState& svc_state) {
197     if (weak_this)
198       weak_this->OnQueryServiceCallback(success, svc_state, std::move(it));
199   };
200   ConsumerEndpoint::QueryServiceStateArgs args;
201   args.sessions_only = req.sessions_only();
202   remote_consumer->service_endpoint->QueryServiceState(args, callback);
203 }
204 
205 // Called by the service in response to service_endpoint->QueryServiceState().
OnQueryServiceCallback(bool success,const TracingServiceState & svc_state,PendingQuerySvcResponses::iterator pending_response_it)206 void ConsumerIPCService::OnQueryServiceCallback(
207     bool success,
208     const TracingServiceState& svc_state,
209     PendingQuerySvcResponses::iterator pending_response_it) {
210   DeferredQueryServiceStateResponse response(std::move(*pending_response_it));
211   pending_query_service_responses_.erase(pending_response_it);
212   if (!success) {
213     response.Reject();
214     return;
215   }
216 
217   // The TracingServiceState object might be too big to fit into a single IPC
218   // message because it contains the DataSourceDescriptor of each data source.
219   // Here we split it in chunks to fit in the IPC limit, observing the
220   // following rule: each chunk must be invididually a valid TracingServiceState
221   // message; all the chunks concatenated together must form the original
222   // message. This is to deal with the legacy API that was just sending one
223   // whole message (failing in presence of too many data sources, b/153142114).
224   // The message is split as follows: we take the whole TracingServiceState,
225   // take out the data sources section (which is a top-level repeated field)
226   // and re-add them one-by-one. If, in the process of appending, the IPC msg
227   // size is reached, a new chunk is created. This assumes that the rest of
228   // TracingServiceState fits in one IPC message and each DataSourceDescriptor
229   // fits in the worst case in a dedicated message (which is true, because
230   // otherwise the RegisterDataSource() which passes the descriptor in the first
231   // place would fail).
232 
233   std::vector<uint8_t> chunked_reply;
234 
235   // Transmits the current chunk and starts a new one.
236   bool sent_eof = false;
237   auto send_chunked_reply = [&chunked_reply, &response,
238                              &sent_eof](bool has_more) {
239     PERFETTO_CHECK(!sent_eof);
240     sent_eof = !has_more;
241     auto resp =
242         ipc::AsyncResult<protos::gen::QueryServiceStateResponse>::Create();
243     resp.set_has_more(has_more);
244     PERFETTO_CHECK(resp->mutable_service_state()->ParseFromArray(
245         chunked_reply.data(), chunked_reply.size()));
246     chunked_reply.clear();
247     response.Resolve(std::move(resp));
248   };
249 
250   // Create a copy of the whole response and cut away the data_sources section.
251   protos::gen::TracingServiceState svc_state_copy = svc_state;
252   auto data_sources = std::move(*svc_state_copy.mutable_data_sources());
253   chunked_reply = svc_state_copy.SerializeAsArray();
254 
255   // Now re-add them fitting within the IPC message limits (- some margin for
256   // the outer IPC frame).
257   constexpr size_t kMaxMsgSize = ipc::kIPCBufferSize - 128;
258   for (const auto& data_source : data_sources) {
259     protos::gen::TracingServiceState tmp;
260     tmp.mutable_data_sources()->emplace_back(std::move(data_source));
261     std::vector<uint8_t> chunk = tmp.SerializeAsArray();
262     if (chunked_reply.size() + chunk.size() < kMaxMsgSize) {
263       chunked_reply.insert(chunked_reply.end(), chunk.begin(), chunk.end());
264     } else {
265       send_chunked_reply(/*has_more=*/true);
266       chunked_reply = std::move(chunk);
267     }
268   }
269 
270   PERFETTO_DCHECK(!chunked_reply.empty());
271   send_chunked_reply(/*has_more=*/false);
272   PERFETTO_CHECK(sent_eof);
273 }
274 
275 // Called by the service in response to a service_endpoint->Flush() request.
OnFlushCallback(bool success,PendingFlushResponses::iterator pending_response_it)276 void ConsumerIPCService::OnFlushCallback(
277     bool success,
278     PendingFlushResponses::iterator pending_response_it) {
279   DeferredFlushResponse response(std::move(*pending_response_it));
280   pending_flush_responses_.erase(pending_response_it);
281   if (success) {
282     response.Resolve(ipc::AsyncResult<protos::gen::FlushResponse>::Create());
283   } else {
284     response.Reject();
285   }
286 }
287 
QueryCapabilities(const protos::gen::QueryCapabilitiesRequest &,DeferredQueryCapabilitiesResponse resp)288 void ConsumerIPCService::QueryCapabilities(
289     const protos::gen::QueryCapabilitiesRequest&,
290     DeferredQueryCapabilitiesResponse resp) {
291   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
292   auto it = pending_query_capabilities_responses_.insert(
293       pending_query_capabilities_responses_.end(), std::move(resp));
294   auto weak_this = weak_ptr_factory_.GetWeakPtr();
295   auto callback = [weak_this, it](const TracingServiceCapabilities& caps) {
296     if (weak_this)
297       weak_this->OnQueryCapabilitiesCallback(caps, std::move(it));
298   };
299   remote_consumer->service_endpoint->QueryCapabilities(callback);
300 }
301 
302 // Called by the service in response to service_endpoint->QueryCapabilities().
OnQueryCapabilitiesCallback(const TracingServiceCapabilities & caps,PendingQueryCapabilitiesResponses::iterator pending_response_it)303 void ConsumerIPCService::OnQueryCapabilitiesCallback(
304     const TracingServiceCapabilities& caps,
305     PendingQueryCapabilitiesResponses::iterator pending_response_it) {
306   DeferredQueryCapabilitiesResponse response(std::move(*pending_response_it));
307   pending_query_capabilities_responses_.erase(pending_response_it);
308   auto resp =
309       ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse>::Create();
310   *resp->mutable_capabilities() = caps;
311   response.Resolve(std::move(resp));
312 }
313 
SaveTraceForBugreport(const protos::gen::SaveTraceForBugreportRequest &,DeferredSaveTraceForBugreportResponse resp)314 void ConsumerIPCService::SaveTraceForBugreport(
315     const protos::gen::SaveTraceForBugreportRequest&,
316     DeferredSaveTraceForBugreportResponse resp) {
317   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
318   auto it = pending_bugreport_responses_.insert(
319       pending_bugreport_responses_.end(), std::move(resp));
320   auto weak_this = weak_ptr_factory_.GetWeakPtr();
321   auto callback = [weak_this, it](bool success, const std::string& msg) {
322     if (weak_this)
323       weak_this->OnSaveTraceForBugreportCallback(success, msg, std::move(it));
324   };
325   remote_consumer->service_endpoint->SaveTraceForBugreport(callback);
326 }
327 
CloneSession(const protos::gen::CloneSessionRequest & req,DeferredCloneSessionResponse resp)328 void ConsumerIPCService::CloneSession(
329     const protos::gen::CloneSessionRequest& req,
330     DeferredCloneSessionResponse resp) {
331   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
332   remote_consumer->clone_session_response = std::move(resp);
333   ConsumerEndpoint::CloneSessionArgs args;
334   args.skip_trace_filter = req.skip_trace_filter();
335   args.for_bugreport = req.for_bugreport();
336   if (req.has_session_id()) {
337     args.tsid = req.session_id();
338   }
339   if (req.has_unique_session_name()) {
340     args.unique_session_name = req.unique_session_name();
341   }
342   if (req.has_clone_trigger_name()) {
343     args.clone_trigger_name = req.clone_trigger_name();
344   }
345   if (req.has_clone_trigger_producer_name()) {
346     args.clone_trigger_producer_name = req.clone_trigger_producer_name();
347   }
348   if (req.has_clone_trigger_trusted_producer_uid()) {
349     args.clone_trigger_trusted_producer_uid =
350         static_cast<uid_t>(req.clone_trigger_trusted_producer_uid());
351   }
352   if (req.has_clone_trigger_boot_time_ns()) {
353     args.clone_trigger_boot_time_ns = req.clone_trigger_boot_time_ns();
354   }
355   remote_consumer->service_endpoint->CloneSession(std::move(args));
356 }
357 
358 // Called by the service in response to
359 // service_endpoint->SaveTraceForBugreport().
OnSaveTraceForBugreportCallback(bool success,const std::string & msg,PendingSaveTraceForBugreportResponses::iterator pending_response_it)360 void ConsumerIPCService::OnSaveTraceForBugreportCallback(
361     bool success,
362     const std::string& msg,
363     PendingSaveTraceForBugreportResponses::iterator pending_response_it) {
364   DeferredSaveTraceForBugreportResponse response(
365       std::move(*pending_response_it));
366   pending_bugreport_responses_.erase(pending_response_it);
367   auto resp =
368       ipc::AsyncResult<protos::gen::SaveTraceForBugreportResponse>::Create();
369   resp->set_success(success);
370   resp->set_msg(msg);
371   response.Resolve(std::move(resp));
372 }
373 
374 ////////////////////////////////////////////////////////////////////////////////
375 // RemoteConsumer methods
376 ////////////////////////////////////////////////////////////////////////////////
377 
378 ConsumerIPCService::RemoteConsumer::RemoteConsumer() = default;
379 ConsumerIPCService::RemoteConsumer::~RemoteConsumer() = default;
380 
381 // Invoked by the |core_service_| business logic after the ConnectConsumer()
382 // call. There is nothing to do here, we really expected the ConnectConsumer()
383 // to just work in the local case.
OnConnect()384 void ConsumerIPCService::RemoteConsumer::OnConnect() {}
385 
386 // Invoked by the |core_service_| business logic after we destroy the
387 // |service_endpoint| (in the RemoteConsumer dtor).
OnDisconnect()388 void ConsumerIPCService::RemoteConsumer::OnDisconnect() {}
389 
OnTracingDisabled(const std::string & error)390 void ConsumerIPCService::RemoteConsumer::OnTracingDisabled(
391     const std::string& error) {
392   if (enable_tracing_response.IsBound()) {
393     auto result =
394         ipc::AsyncResult<protos::gen::EnableTracingResponse>::Create();
395     result->set_disabled(true);
396     if (!error.empty())
397       result->set_error(error);
398     enable_tracing_response.Resolve(std::move(result));
399   }
400 }
401 
OnTraceData(std::vector<TracePacket> trace_packets,bool has_more)402 void ConsumerIPCService::RemoteConsumer::OnTraceData(
403     std::vector<TracePacket> trace_packets,
404     bool has_more) {
405   if (!read_buffers_response.IsBound())
406     return;
407 
408   auto result = ipc::AsyncResult<protos::gen::ReadBuffersResponse>::Create();
409 
410   // A TracePacket might be too big to fit into a single IPC message (max
411   // kIPCBufferSize). However a TracePacket is made of slices and each slice
412   // is way smaller than kIPCBufferSize (a slice size is effectively bounded by
413   // the max chunk size of the SharedMemoryABI). When sending a TracePacket,
414   // if its slices don't fit within one IPC, chunk them over several contiguous
415   // IPCs using the |last_slice_for_packet| for glueing on the other side.
416   static_assert(ipc::kIPCBufferSize >= SharedMemoryABI::kMaxPageSize * 2,
417                 "kIPCBufferSize too small given the max possible slice size");
418 
419   auto send_ipc_reply = [this, &result](bool more) {
420     result.set_has_more(more);
421     read_buffers_response.Resolve(std::move(result));
422     result = ipc::AsyncResult<protos::gen::ReadBuffersResponse>::Create();
423   };
424 
425   size_t approx_reply_size = 0;
426   for (const TracePacket& trace_packet : trace_packets) {
427     size_t num_slices_left_for_packet = trace_packet.slices().size();
428     for (const Slice& slice : trace_packet.slices()) {
429       // Check if this slice would cause the IPC to overflow its max size and,
430       // if that is the case, split the IPCs. The "16" and "64" below are
431       // over-estimations of, respectively:
432       // 16: the preamble that prefixes each slice (there are 2 x size fields
433       //     in the proto + the |last_slice_for_packet| bool).
434       // 64: the overhead of the IPC InvokeMethodReply + wire_protocol's frame.
435       // If these estimations are wrong, BufferedFrameDeserializer::Serialize()
436       // will hit a DCHECK anyways.
437       const size_t approx_slice_size = slice.size + 16;
438       if (approx_reply_size + approx_slice_size > ipc::kIPCBufferSize - 64) {
439         // If we hit this CHECK we got a single slice that is > kIPCBufferSize.
440         PERFETTO_CHECK(result->slices_size() > 0);
441         send_ipc_reply(/*has_more=*/true);
442         approx_reply_size = 0;
443       }
444       approx_reply_size += approx_slice_size;
445 
446       auto* res_slice = result->add_slices();
447       res_slice->set_last_slice_for_packet(--num_slices_left_for_packet == 0);
448       res_slice->set_data(slice.start, slice.size);
449     }
450   }
451   send_ipc_reply(has_more);
452 }
453 
OnDetach(bool success)454 void ConsumerIPCService::RemoteConsumer::OnDetach(bool success) {
455   if (!success) {
456     std::move(detach_response).Reject();
457     return;
458   }
459   auto resp = ipc::AsyncResult<protos::gen::DetachResponse>::Create();
460   std::move(detach_response).Resolve(std::move(resp));
461 }
462 
OnAttach(bool success,const TraceConfig & trace_config)463 void ConsumerIPCService::RemoteConsumer::OnAttach(
464     bool success,
465     const TraceConfig& trace_config) {
466   if (!success) {
467     std::move(attach_response).Reject();
468     return;
469   }
470   auto response = ipc::AsyncResult<protos::gen::AttachResponse>::Create();
471   *response->mutable_trace_config() = trace_config;
472   std::move(attach_response).Resolve(std::move(response));
473 }
474 
OnTraceStats(bool success,const TraceStats & stats)475 void ConsumerIPCService::RemoteConsumer::OnTraceStats(bool success,
476                                                       const TraceStats& stats) {
477   if (!success) {
478     std::move(get_trace_stats_response).Reject();
479     return;
480   }
481   auto response =
482       ipc::AsyncResult<protos::gen::GetTraceStatsResponse>::Create();
483   *response->mutable_trace_stats() = stats;
484   std::move(get_trace_stats_response).Resolve(std::move(response));
485 }
486 
OnObservableEvents(const ObservableEvents & events)487 void ConsumerIPCService::RemoteConsumer::OnObservableEvents(
488     const ObservableEvents& events) {
489   if (!observe_events_response.IsBound())
490     return;
491 
492   auto result = ipc::AsyncResult<protos::gen::ObserveEventsResponse>::Create();
493   result.set_has_more(true);
494   *result->mutable_events() = events;
495   observe_events_response.Resolve(std::move(result));
496 }
497 
CloseObserveEventsResponseStream()498 void ConsumerIPCService::RemoteConsumer::CloseObserveEventsResponseStream() {
499   if (!observe_events_response.IsBound())
500     return;
501 
502   auto result = ipc::AsyncResult<protos::gen::ObserveEventsResponse>::Create();
503   result.set_has_more(false);
504   observe_events_response.Resolve(std::move(result));
505 }
506 
OnSessionCloned(const OnSessionClonedArgs & args)507 void ConsumerIPCService::RemoteConsumer::OnSessionCloned(
508     const OnSessionClonedArgs& args) {
509   if (!clone_session_response.IsBound())
510     return;
511 
512   auto resp = ipc::AsyncResult<protos::gen::CloneSessionResponse>::Create();
513   resp->set_success(args.success);
514   resp->set_error(args.error);
515   resp->set_uuid_msb(args.uuid.msb());
516   resp->set_uuid_lsb(args.uuid.lsb());
517   std::move(clone_session_response).Resolve(std::move(resp));
518 }
519 
520 }  // namespace perfetto
521