1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/ipc/service/consumer_ipc_service.h"
18
19 #include <cinttypes>
20
21 #include "perfetto/base/logging.h"
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ext/base/scoped_file.h"
24 #include "perfetto/ext/ipc/basic_types.h"
25 #include "perfetto/ext/ipc/host.h"
26 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
27 #include "perfetto/ext/tracing/core/slice.h"
28 #include "perfetto/ext/tracing/core/trace_packet.h"
29 #include "perfetto/ext/tracing/core/trace_stats.h"
30 #include "perfetto/ext/tracing/core/tracing_service.h"
31 #include "perfetto/tracing/core/trace_config.h"
32 #include "perfetto/tracing/core/tracing_service_capabilities.h"
33 #include "perfetto/tracing/core/tracing_service_state.h"
34
35 namespace perfetto {
36
ConsumerIPCService(TracingService * core_service)37 ConsumerIPCService::ConsumerIPCService(TracingService* core_service)
38 : core_service_(core_service), weak_ptr_factory_(this) {}
39
40 ConsumerIPCService::~ConsumerIPCService() = default;
41
42 ConsumerIPCService::RemoteConsumer*
GetConsumerForCurrentRequest()43 ConsumerIPCService::GetConsumerForCurrentRequest() {
44 const ipc::ClientID ipc_client_id = ipc::Service::client_info().client_id();
45 const uid_t uid = ipc::Service::client_info().uid();
46 PERFETTO_CHECK(ipc_client_id);
47 auto it = consumers_.find(ipc_client_id);
48 if (it == consumers_.end()) {
49 auto* remote_consumer = new RemoteConsumer();
50 consumers_[ipc_client_id].reset(remote_consumer);
51 remote_consumer->service_endpoint =
52 core_service_->ConnectConsumer(remote_consumer, uid);
53 return remote_consumer;
54 }
55 return it->second.get();
56 }
57
58 // Called by the IPC layer.
OnClientDisconnected()59 void ConsumerIPCService::OnClientDisconnected() {
60 ipc::ClientID client_id = ipc::Service::client_info().client_id();
61 consumers_.erase(client_id);
62 }
63
64 // Called by the IPC layer.
EnableTracing(const protos::gen::EnableTracingRequest & req,DeferredEnableTracingResponse resp)65 void ConsumerIPCService::EnableTracing(
66 const protos::gen::EnableTracingRequest& req,
67 DeferredEnableTracingResponse resp) {
68 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
69 if (req.attach_notification_only()) {
70 remote_consumer->enable_tracing_response = std::move(resp);
71 return;
72 }
73 const TraceConfig& trace_config = req.trace_config();
74 base::ScopedFile fd;
75 if (trace_config.write_into_file() && trace_config.output_path().empty())
76 fd = ipc::Service::TakeReceivedFD();
77 remote_consumer->service_endpoint->EnableTracing(trace_config, std::move(fd));
78 remote_consumer->enable_tracing_response = std::move(resp);
79 }
80
81 // Called by the IPC layer.
StartTracing(const protos::gen::StartTracingRequest &,DeferredStartTracingResponse resp)82 void ConsumerIPCService::StartTracing(const protos::gen::StartTracingRequest&,
83 DeferredStartTracingResponse resp) {
84 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
85 remote_consumer->service_endpoint->StartTracing();
86 resp.Resolve(ipc::AsyncResult<protos::gen::StartTracingResponse>::Create());
87 }
88
89 // Called by the IPC layer.
ChangeTraceConfig(const protos::gen::ChangeTraceConfigRequest & req,DeferredChangeTraceConfigResponse resp)90 void ConsumerIPCService::ChangeTraceConfig(
91 const protos::gen::ChangeTraceConfigRequest& req,
92 DeferredChangeTraceConfigResponse resp) {
93 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
94 remote_consumer->service_endpoint->ChangeTraceConfig(req.trace_config());
95 resp.Resolve(
96 ipc::AsyncResult<protos::gen::ChangeTraceConfigResponse>::Create());
97 }
98
99 // Called by the IPC layer.
DisableTracing(const protos::gen::DisableTracingRequest &,DeferredDisableTracingResponse resp)100 void ConsumerIPCService::DisableTracing(
101 const protos::gen::DisableTracingRequest&,
102 DeferredDisableTracingResponse resp) {
103 GetConsumerForCurrentRequest()->service_endpoint->DisableTracing();
104 resp.Resolve(ipc::AsyncResult<protos::gen::DisableTracingResponse>::Create());
105 }
106
107 // Called by the IPC layer.
ReadBuffers(const protos::gen::ReadBuffersRequest &,DeferredReadBuffersResponse resp)108 void ConsumerIPCService::ReadBuffers(const protos::gen::ReadBuffersRequest&,
109 DeferredReadBuffersResponse resp) {
110 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
111 remote_consumer->read_buffers_response = std::move(resp);
112 remote_consumer->service_endpoint->ReadBuffers();
113 }
114
115 // Called by the IPC layer.
FreeBuffers(const protos::gen::FreeBuffersRequest &,DeferredFreeBuffersResponse resp)116 void ConsumerIPCService::FreeBuffers(const protos::gen::FreeBuffersRequest&,
117 DeferredFreeBuffersResponse resp) {
118 GetConsumerForCurrentRequest()->service_endpoint->FreeBuffers();
119 resp.Resolve(ipc::AsyncResult<protos::gen::FreeBuffersResponse>::Create());
120 }
121
122 // Called by the IPC layer.
Flush(const protos::gen::FlushRequest & req,DeferredFlushResponse resp)123 void ConsumerIPCService::Flush(const protos::gen::FlushRequest& req,
124 DeferredFlushResponse resp) {
125 auto it = pending_flush_responses_.insert(pending_flush_responses_.end(),
126 std::move(resp));
127 auto weak_this = weak_ptr_factory_.GetWeakPtr();
128 auto callback = [weak_this, it](bool success) {
129 if (weak_this)
130 weak_this->OnFlushCallback(success, std::move(it));
131 };
132 FlushFlags flags(req.flags());
133 GetConsumerForCurrentRequest()->service_endpoint->Flush(
134 req.timeout_ms(), std::move(callback), flags);
135 }
136
137 // Called by the IPC layer.
Detach(const protos::gen::DetachRequest & req,DeferredDetachResponse resp)138 void ConsumerIPCService::Detach(const protos::gen::DetachRequest& req,
139 DeferredDetachResponse resp) {
140 // OnDetach() will resolve the |detach_response|.
141 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
142 remote_consumer->detach_response = std::move(resp);
143 remote_consumer->service_endpoint->Detach(req.key());
144 }
145
146 // Called by the IPC layer.
Attach(const protos::gen::AttachRequest & req,DeferredAttachResponse resp)147 void ConsumerIPCService::Attach(const protos::gen::AttachRequest& req,
148 DeferredAttachResponse resp) {
149 // OnAttach() will resolve the |attach_response|.
150 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
151 remote_consumer->attach_response = std::move(resp);
152 remote_consumer->service_endpoint->Attach(req.key());
153 }
154
155 // Called by the IPC layer.
GetTraceStats(const protos::gen::GetTraceStatsRequest &,DeferredGetTraceStatsResponse resp)156 void ConsumerIPCService::GetTraceStats(const protos::gen::GetTraceStatsRequest&,
157 DeferredGetTraceStatsResponse resp) {
158 // OnTraceStats() will resolve the |get_trace_stats_response|.
159 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
160 remote_consumer->get_trace_stats_response = std::move(resp);
161 remote_consumer->service_endpoint->GetTraceStats();
162 }
163
164 // Called by the IPC layer.
ObserveEvents(const protos::gen::ObserveEventsRequest & req,DeferredObserveEventsResponse resp)165 void ConsumerIPCService::ObserveEvents(
166 const protos::gen::ObserveEventsRequest& req,
167 DeferredObserveEventsResponse resp) {
168 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
169
170 // If there's a prior stream, close it so that client can clean it up.
171 remote_consumer->CloseObserveEventsResponseStream();
172
173 remote_consumer->observe_events_response = std::move(resp);
174
175 uint32_t events_mask = 0;
176 for (const auto& type : req.events_to_observe()) {
177 events_mask |= static_cast<uint32_t>(type);
178 }
179 remote_consumer->service_endpoint->ObserveEvents(events_mask);
180
181 // If no events are to be observed, close the stream immediately so that the
182 // client can clean up.
183 if (events_mask == 0)
184 remote_consumer->CloseObserveEventsResponseStream();
185 }
186
187 // Called by the IPC layer.
QueryServiceState(const protos::gen::QueryServiceStateRequest & req,DeferredQueryServiceStateResponse resp)188 void ConsumerIPCService::QueryServiceState(
189 const protos::gen::QueryServiceStateRequest& req,
190 DeferredQueryServiceStateResponse resp) {
191 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
192 auto it = pending_query_service_responses_.insert(
193 pending_query_service_responses_.end(), std::move(resp));
194 auto weak_this = weak_ptr_factory_.GetWeakPtr();
195 auto callback = [weak_this, it](bool success,
196 const TracingServiceState& svc_state) {
197 if (weak_this)
198 weak_this->OnQueryServiceCallback(success, svc_state, std::move(it));
199 };
200 ConsumerEndpoint::QueryServiceStateArgs args;
201 args.sessions_only = req.sessions_only();
202 remote_consumer->service_endpoint->QueryServiceState(args, callback);
203 }
204
205 // Called by the service in response to service_endpoint->QueryServiceState().
OnQueryServiceCallback(bool success,const TracingServiceState & svc_state,PendingQuerySvcResponses::iterator pending_response_it)206 void ConsumerIPCService::OnQueryServiceCallback(
207 bool success,
208 const TracingServiceState& svc_state,
209 PendingQuerySvcResponses::iterator pending_response_it) {
210 DeferredQueryServiceStateResponse response(std::move(*pending_response_it));
211 pending_query_service_responses_.erase(pending_response_it);
212 if (!success) {
213 response.Reject();
214 return;
215 }
216
217 // The TracingServiceState object might be too big to fit into a single IPC
218 // message because it contains the DataSourceDescriptor of each data source.
219 // Here we split it in chunks to fit in the IPC limit, observing the
220 // following rule: each chunk must be invididually a valid TracingServiceState
221 // message; all the chunks concatenated together must form the original
222 // message. This is to deal with the legacy API that was just sending one
223 // whole message (failing in presence of too many data sources, b/153142114).
224 // The message is split as follows: we take the whole TracingServiceState,
225 // take out the data sources section (which is a top-level repeated field)
226 // and re-add them one-by-one. If, in the process of appending, the IPC msg
227 // size is reached, a new chunk is created. This assumes that the rest of
228 // TracingServiceState fits in one IPC message and each DataSourceDescriptor
229 // fits in the worst case in a dedicated message (which is true, because
230 // otherwise the RegisterDataSource() which passes the descriptor in the first
231 // place would fail).
232
233 std::vector<uint8_t> chunked_reply;
234
235 // Transmits the current chunk and starts a new one.
236 bool sent_eof = false;
237 auto send_chunked_reply = [&chunked_reply, &response,
238 &sent_eof](bool has_more) {
239 PERFETTO_CHECK(!sent_eof);
240 sent_eof = !has_more;
241 auto resp =
242 ipc::AsyncResult<protos::gen::QueryServiceStateResponse>::Create();
243 resp.set_has_more(has_more);
244 PERFETTO_CHECK(resp->mutable_service_state()->ParseFromArray(
245 chunked_reply.data(), chunked_reply.size()));
246 chunked_reply.clear();
247 response.Resolve(std::move(resp));
248 };
249
250 // Create a copy of the whole response and cut away the data_sources section.
251 protos::gen::TracingServiceState svc_state_copy = svc_state;
252 auto data_sources = std::move(*svc_state_copy.mutable_data_sources());
253 chunked_reply = svc_state_copy.SerializeAsArray();
254
255 // Now re-add them fitting within the IPC message limits (- some margin for
256 // the outer IPC frame).
257 constexpr size_t kMaxMsgSize = ipc::kIPCBufferSize - 128;
258 for (const auto& data_source : data_sources) {
259 protos::gen::TracingServiceState tmp;
260 tmp.mutable_data_sources()->emplace_back(std::move(data_source));
261 std::vector<uint8_t> chunk = tmp.SerializeAsArray();
262 if (chunked_reply.size() + chunk.size() < kMaxMsgSize) {
263 chunked_reply.insert(chunked_reply.end(), chunk.begin(), chunk.end());
264 } else {
265 send_chunked_reply(/*has_more=*/true);
266 chunked_reply = std::move(chunk);
267 }
268 }
269
270 PERFETTO_DCHECK(!chunked_reply.empty());
271 send_chunked_reply(/*has_more=*/false);
272 PERFETTO_CHECK(sent_eof);
273 }
274
275 // Called by the service in response to a service_endpoint->Flush() request.
OnFlushCallback(bool success,PendingFlushResponses::iterator pending_response_it)276 void ConsumerIPCService::OnFlushCallback(
277 bool success,
278 PendingFlushResponses::iterator pending_response_it) {
279 DeferredFlushResponse response(std::move(*pending_response_it));
280 pending_flush_responses_.erase(pending_response_it);
281 if (success) {
282 response.Resolve(ipc::AsyncResult<protos::gen::FlushResponse>::Create());
283 } else {
284 response.Reject();
285 }
286 }
287
QueryCapabilities(const protos::gen::QueryCapabilitiesRequest &,DeferredQueryCapabilitiesResponse resp)288 void ConsumerIPCService::QueryCapabilities(
289 const protos::gen::QueryCapabilitiesRequest&,
290 DeferredQueryCapabilitiesResponse resp) {
291 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
292 auto it = pending_query_capabilities_responses_.insert(
293 pending_query_capabilities_responses_.end(), std::move(resp));
294 auto weak_this = weak_ptr_factory_.GetWeakPtr();
295 auto callback = [weak_this, it](const TracingServiceCapabilities& caps) {
296 if (weak_this)
297 weak_this->OnQueryCapabilitiesCallback(caps, std::move(it));
298 };
299 remote_consumer->service_endpoint->QueryCapabilities(callback);
300 }
301
302 // Called by the service in response to service_endpoint->QueryCapabilities().
OnQueryCapabilitiesCallback(const TracingServiceCapabilities & caps,PendingQueryCapabilitiesResponses::iterator pending_response_it)303 void ConsumerIPCService::OnQueryCapabilitiesCallback(
304 const TracingServiceCapabilities& caps,
305 PendingQueryCapabilitiesResponses::iterator pending_response_it) {
306 DeferredQueryCapabilitiesResponse response(std::move(*pending_response_it));
307 pending_query_capabilities_responses_.erase(pending_response_it);
308 auto resp =
309 ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse>::Create();
310 *resp->mutable_capabilities() = caps;
311 response.Resolve(std::move(resp));
312 }
313
SaveTraceForBugreport(const protos::gen::SaveTraceForBugreportRequest &,DeferredSaveTraceForBugreportResponse resp)314 void ConsumerIPCService::SaveTraceForBugreport(
315 const protos::gen::SaveTraceForBugreportRequest&,
316 DeferredSaveTraceForBugreportResponse resp) {
317 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
318 auto it = pending_bugreport_responses_.insert(
319 pending_bugreport_responses_.end(), std::move(resp));
320 auto weak_this = weak_ptr_factory_.GetWeakPtr();
321 auto callback = [weak_this, it](bool success, const std::string& msg) {
322 if (weak_this)
323 weak_this->OnSaveTraceForBugreportCallback(success, msg, std::move(it));
324 };
325 remote_consumer->service_endpoint->SaveTraceForBugreport(callback);
326 }
327
CloneSession(const protos::gen::CloneSessionRequest & req,DeferredCloneSessionResponse resp)328 void ConsumerIPCService::CloneSession(
329 const protos::gen::CloneSessionRequest& req,
330 DeferredCloneSessionResponse resp) {
331 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
332 remote_consumer->clone_session_response = std::move(resp);
333 ConsumerEndpoint::CloneSessionArgs args;
334 args.skip_trace_filter = req.skip_trace_filter();
335 args.for_bugreport = req.for_bugreport();
336 if (req.has_session_id()) {
337 args.tsid = req.session_id();
338 }
339 if (req.has_unique_session_name()) {
340 args.unique_session_name = req.unique_session_name();
341 }
342 if (req.has_clone_trigger_name()) {
343 args.clone_trigger_name = req.clone_trigger_name();
344 }
345 if (req.has_clone_trigger_producer_name()) {
346 args.clone_trigger_producer_name = req.clone_trigger_producer_name();
347 }
348 if (req.has_clone_trigger_trusted_producer_uid()) {
349 args.clone_trigger_trusted_producer_uid =
350 static_cast<uid_t>(req.clone_trigger_trusted_producer_uid());
351 }
352 if (req.has_clone_trigger_boot_time_ns()) {
353 args.clone_trigger_boot_time_ns = req.clone_trigger_boot_time_ns();
354 }
355 remote_consumer->service_endpoint->CloneSession(std::move(args));
356 }
357
358 // Called by the service in response to
359 // service_endpoint->SaveTraceForBugreport().
OnSaveTraceForBugreportCallback(bool success,const std::string & msg,PendingSaveTraceForBugreportResponses::iterator pending_response_it)360 void ConsumerIPCService::OnSaveTraceForBugreportCallback(
361 bool success,
362 const std::string& msg,
363 PendingSaveTraceForBugreportResponses::iterator pending_response_it) {
364 DeferredSaveTraceForBugreportResponse response(
365 std::move(*pending_response_it));
366 pending_bugreport_responses_.erase(pending_response_it);
367 auto resp =
368 ipc::AsyncResult<protos::gen::SaveTraceForBugreportResponse>::Create();
369 resp->set_success(success);
370 resp->set_msg(msg);
371 response.Resolve(std::move(resp));
372 }
373
374 ////////////////////////////////////////////////////////////////////////////////
375 // RemoteConsumer methods
376 ////////////////////////////////////////////////////////////////////////////////
377
378 ConsumerIPCService::RemoteConsumer::RemoteConsumer() = default;
379 ConsumerIPCService::RemoteConsumer::~RemoteConsumer() = default;
380
381 // Invoked by the |core_service_| business logic after the ConnectConsumer()
382 // call. There is nothing to do here, we really expected the ConnectConsumer()
383 // to just work in the local case.
OnConnect()384 void ConsumerIPCService::RemoteConsumer::OnConnect() {}
385
386 // Invoked by the |core_service_| business logic after we destroy the
387 // |service_endpoint| (in the RemoteConsumer dtor).
OnDisconnect()388 void ConsumerIPCService::RemoteConsumer::OnDisconnect() {}
389
OnTracingDisabled(const std::string & error)390 void ConsumerIPCService::RemoteConsumer::OnTracingDisabled(
391 const std::string& error) {
392 if (enable_tracing_response.IsBound()) {
393 auto result =
394 ipc::AsyncResult<protos::gen::EnableTracingResponse>::Create();
395 result->set_disabled(true);
396 if (!error.empty())
397 result->set_error(error);
398 enable_tracing_response.Resolve(std::move(result));
399 }
400 }
401
OnTraceData(std::vector<TracePacket> trace_packets,bool has_more)402 void ConsumerIPCService::RemoteConsumer::OnTraceData(
403 std::vector<TracePacket> trace_packets,
404 bool has_more) {
405 if (!read_buffers_response.IsBound())
406 return;
407
408 auto result = ipc::AsyncResult<protos::gen::ReadBuffersResponse>::Create();
409
410 // A TracePacket might be too big to fit into a single IPC message (max
411 // kIPCBufferSize). However a TracePacket is made of slices and each slice
412 // is way smaller than kIPCBufferSize (a slice size is effectively bounded by
413 // the max chunk size of the SharedMemoryABI). When sending a TracePacket,
414 // if its slices don't fit within one IPC, chunk them over several contiguous
415 // IPCs using the |last_slice_for_packet| for glueing on the other side.
416 static_assert(ipc::kIPCBufferSize >= SharedMemoryABI::kMaxPageSize * 2,
417 "kIPCBufferSize too small given the max possible slice size");
418
419 auto send_ipc_reply = [this, &result](bool more) {
420 result.set_has_more(more);
421 read_buffers_response.Resolve(std::move(result));
422 result = ipc::AsyncResult<protos::gen::ReadBuffersResponse>::Create();
423 };
424
425 size_t approx_reply_size = 0;
426 for (const TracePacket& trace_packet : trace_packets) {
427 size_t num_slices_left_for_packet = trace_packet.slices().size();
428 for (const Slice& slice : trace_packet.slices()) {
429 // Check if this slice would cause the IPC to overflow its max size and,
430 // if that is the case, split the IPCs. The "16" and "64" below are
431 // over-estimations of, respectively:
432 // 16: the preamble that prefixes each slice (there are 2 x size fields
433 // in the proto + the |last_slice_for_packet| bool).
434 // 64: the overhead of the IPC InvokeMethodReply + wire_protocol's frame.
435 // If these estimations are wrong, BufferedFrameDeserializer::Serialize()
436 // will hit a DCHECK anyways.
437 const size_t approx_slice_size = slice.size + 16;
438 if (approx_reply_size + approx_slice_size > ipc::kIPCBufferSize - 64) {
439 // If we hit this CHECK we got a single slice that is > kIPCBufferSize.
440 PERFETTO_CHECK(result->slices_size() > 0);
441 send_ipc_reply(/*has_more=*/true);
442 approx_reply_size = 0;
443 }
444 approx_reply_size += approx_slice_size;
445
446 auto* res_slice = result->add_slices();
447 res_slice->set_last_slice_for_packet(--num_slices_left_for_packet == 0);
448 res_slice->set_data(slice.start, slice.size);
449 }
450 }
451 send_ipc_reply(has_more);
452 }
453
OnDetach(bool success)454 void ConsumerIPCService::RemoteConsumer::OnDetach(bool success) {
455 if (!success) {
456 std::move(detach_response).Reject();
457 return;
458 }
459 auto resp = ipc::AsyncResult<protos::gen::DetachResponse>::Create();
460 std::move(detach_response).Resolve(std::move(resp));
461 }
462
OnAttach(bool success,const TraceConfig & trace_config)463 void ConsumerIPCService::RemoteConsumer::OnAttach(
464 bool success,
465 const TraceConfig& trace_config) {
466 if (!success) {
467 std::move(attach_response).Reject();
468 return;
469 }
470 auto response = ipc::AsyncResult<protos::gen::AttachResponse>::Create();
471 *response->mutable_trace_config() = trace_config;
472 std::move(attach_response).Resolve(std::move(response));
473 }
474
OnTraceStats(bool success,const TraceStats & stats)475 void ConsumerIPCService::RemoteConsumer::OnTraceStats(bool success,
476 const TraceStats& stats) {
477 if (!success) {
478 std::move(get_trace_stats_response).Reject();
479 return;
480 }
481 auto response =
482 ipc::AsyncResult<protos::gen::GetTraceStatsResponse>::Create();
483 *response->mutable_trace_stats() = stats;
484 std::move(get_trace_stats_response).Resolve(std::move(response));
485 }
486
OnObservableEvents(const ObservableEvents & events)487 void ConsumerIPCService::RemoteConsumer::OnObservableEvents(
488 const ObservableEvents& events) {
489 if (!observe_events_response.IsBound())
490 return;
491
492 auto result = ipc::AsyncResult<protos::gen::ObserveEventsResponse>::Create();
493 result.set_has_more(true);
494 *result->mutable_events() = events;
495 observe_events_response.Resolve(std::move(result));
496 }
497
CloseObserveEventsResponseStream()498 void ConsumerIPCService::RemoteConsumer::CloseObserveEventsResponseStream() {
499 if (!observe_events_response.IsBound())
500 return;
501
502 auto result = ipc::AsyncResult<protos::gen::ObserveEventsResponse>::Create();
503 result.set_has_more(false);
504 observe_events_response.Resolve(std::move(result));
505 }
506
OnSessionCloned(const OnSessionClonedArgs & args)507 void ConsumerIPCService::RemoteConsumer::OnSessionCloned(
508 const OnSessionClonedArgs& args) {
509 if (!clone_session_response.IsBound())
510 return;
511
512 auto resp = ipc::AsyncResult<protos::gen::CloneSessionResponse>::Create();
513 resp->set_success(args.success);
514 resp->set_error(args.error);
515 resp->set_uuid_msb(args.uuid.msb());
516 resp->set_uuid_lsb(args.uuid.lsb());
517 std::move(clone_session_response).Resolve(std::move(resp));
518 }
519
520 } // namespace perfetto
521