xref: /aosp_15_r20/external/perfetto/src/tracing/test/tracing_integration_test.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <cinttypes>
18 
19 #include "perfetto/ext/base/file_utils.h"
20 #include "perfetto/ext/base/string_utils.h"
21 #include "perfetto/ext/base/temp_file.h"
22 #include "perfetto/ext/tracing/core/consumer.h"
23 #include "perfetto/ext/tracing/core/producer.h"
24 #include "perfetto/ext/tracing/core/trace_packet.h"
25 #include "perfetto/ext/tracing/core/trace_stats.h"
26 #include "perfetto/ext/tracing/core/trace_writer.h"
27 #include "perfetto/ext/tracing/ipc/consumer_ipc_client.h"
28 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
29 #include "perfetto/ext/tracing/ipc/service_ipc_host.h"
30 #include "perfetto/tracing/core/data_source_config.h"
31 #include "perfetto/tracing/core/data_source_descriptor.h"
32 #include "perfetto/tracing/core/trace_config.h"
33 #include "src/base/test/test_task_runner.h"
34 #include "src/ipc/test/test_socket.h"
35 #include "src/tracing/service/tracing_service_impl.h"
36 #include "test/gtest_and_gmock.h"
37 
38 #include "protos/perfetto/config/trace_config.gen.h"
39 #include "protos/perfetto/trace/clock_snapshot.gen.h"
40 #include "protos/perfetto/trace/test_event.gen.h"
41 #include "protos/perfetto/trace/test_event.pbzero.h"
42 #include "protos/perfetto/trace/trace.gen.h"
43 #include "protos/perfetto/trace/trace_packet.gen.h"
44 #include "protos/perfetto/trace/trace_packet.pbzero.h"
45 
46 namespace perfetto {
47 namespace {
48 
49 using testing::_;
50 using testing::Invoke;
51 using testing::InvokeWithoutArgs;
52 
53 ipc::TestSocket kProducerSock{"tracing_test-producer"};
54 ipc::TestSocket kConsumerSock{"tracing_test-consumer"};
55 
56 // TODO(rsavitski): consider using src/tracing/test/mock_producer.h.
57 class MockProducer : public Producer {
58  public:
~MockProducer()59   ~MockProducer() override {}
60 
61   // Producer implementation.
62   MOCK_METHOD(void, OnConnect, (), (override));
63   MOCK_METHOD(void, OnDisconnect, (), (override));
64   MOCK_METHOD(void,
65               SetupDataSource,
66               (DataSourceInstanceID, const DataSourceConfig&),
67               (override));
68   MOCK_METHOD(void,
69               StartDataSource,
70               (DataSourceInstanceID, const DataSourceConfig&),
71               (override));
72   MOCK_METHOD(void, StopDataSource, (DataSourceInstanceID), (override));
73   MOCK_METHOD(void, OnTracingSetup, (), (override));
74   MOCK_METHOD(void,
75               Flush,
76               (FlushRequestID, const DataSourceInstanceID*, size_t, FlushFlags),
77               (override));
78   MOCK_METHOD(void,
79               ClearIncrementalState,
80               (const DataSourceInstanceID*, size_t),
81               (override));
82 };
83 
84 class MockConsumer : public Consumer {
85  public:
~MockConsumer()86   ~MockConsumer() override {}
87 
88   // Producer implementation.
89   MOCK_METHOD(void, OnConnect, (), (override));
90   MOCK_METHOD(void, OnDisconnect, (), (override));
91   MOCK_METHOD(void,
92               OnTracingDisabled,
93               (const std::string& /*error*/),
94               (override));
95   MOCK_METHOD(void, OnTracePackets, (std::vector<TracePacket>*, bool));
96   MOCK_METHOD(void, OnDetach, (bool), (override));
97   MOCK_METHOD(void, OnAttach, (bool, const TraceConfig&), (override));
98   MOCK_METHOD(void, OnTraceStats, (bool, const TraceStats&), (override));
99   MOCK_METHOD(void, OnObservableEvents, (const ObservableEvents&), (override));
100   MOCK_METHOD(void, OnSessionCloned, (const OnSessionClonedArgs&), (override));
101 
102   // Workaround, gmock doesn't support yet move-only types, passing a pointer.
OnTraceData(std::vector<TracePacket> packets,bool has_more)103   void OnTraceData(std::vector<TracePacket> packets, bool has_more) {
104     OnTracePackets(&packets, has_more);
105   }
106 };
107 
CheckTraceStats(const protos::gen::TracePacket & packet)108 void CheckTraceStats(const protos::gen::TracePacket& packet) {
109   EXPECT_TRUE(packet.has_trace_stats());
110   EXPECT_GE(packet.trace_stats().producers_seen(), 1u);
111   EXPECT_EQ(1u, packet.trace_stats().producers_connected());
112   EXPECT_EQ(1u, packet.trace_stats().data_sources_registered());
113   EXPECT_EQ(1u, packet.trace_stats().tracing_sessions());
114   EXPECT_EQ(1u, packet.trace_stats().total_buffers());
115   EXPECT_EQ(1, packet.trace_stats().buffer_stats_size());
116 
117   const auto& buf_stats = packet.trace_stats().buffer_stats()[0];
118   EXPECT_GT(buf_stats.bytes_written(), 0u);
119   EXPECT_GT(buf_stats.chunks_written(), 0u);
120   EXPECT_EQ(0u, buf_stats.chunks_overwritten());
121   EXPECT_EQ(0u, buf_stats.chunks_rewritten());
122   EXPECT_EQ(0u, buf_stats.chunks_committed_out_of_order());
123   EXPECT_EQ(0u, buf_stats.write_wrap_count());
124   EXPECT_EQ(0u, buf_stats.patches_failed());
125   EXPECT_EQ(0u, buf_stats.readaheads_failed());
126   EXPECT_EQ(0u, buf_stats.abi_violations());
127 }
128 
129 static_assert(TracingServiceImpl::kMaxTracePacketSliceSize <=
130                   ipc::kIPCBufferSize - 512,
131               "Tracing service max packet slice should be smaller than IPC "
132               "buffer size (with some headroom)");
133 
134 }  // namespace
135 
136 class TracingIntegrationTest : public ::testing::Test {
137  public:
SetUp()138   void SetUp() override {
139     kProducerSock.Destroy();
140     kConsumerSock.Destroy();
141     task_runner_.reset(new base::TestTaskRunner());
142 
143     // Create the service host.
144     svc_ = ServiceIPCHost::CreateInstance(task_runner_.get());
145     svc_->Start(kProducerSock.name(), kConsumerSock.name());
146 
147     // Create and connect a Producer.
148     producer_endpoint_ = ProducerIPCClient::Connect(
149         kProducerSock.name(), &producer_, "perfetto.mock_producer",
150         task_runner_.get(), GetProducerSMBScrapingMode());
151     auto on_producer_connect =
152         task_runner_->CreateCheckpoint("on_producer_connect");
153     EXPECT_CALL(producer_, OnConnect()).WillOnce(Invoke(on_producer_connect));
154     task_runner_->RunUntilCheckpoint("on_producer_connect");
155 
156     // Register a data source.
157     DataSourceDescriptor ds_desc;
158     ds_desc.set_name("perfetto.test");
159     producer_endpoint_->RegisterDataSource(ds_desc);
160 
161     // Create and connect a Consumer.
162     consumer_endpoint_ = ConsumerIPCClient::Connect(
163         kConsumerSock.name(), &consumer_, task_runner_.get());
164     auto on_consumer_connect =
165         task_runner_->CreateCheckpoint("on_consumer_connect");
166     EXPECT_CALL(consumer_, OnConnect()).WillOnce(Invoke(on_consumer_connect));
167     task_runner_->RunUntilCheckpoint("on_consumer_connect");
168 
169     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
170     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
171   }
172 
TearDown()173   void TearDown() override {
174     // Destroy the service and check that both Producer and Consumer see an
175     // OnDisconnect() call.
176 
177     auto on_producer_disconnect =
178         task_runner_->CreateCheckpoint("on_producer_disconnect");
179     EXPECT_CALL(producer_, OnDisconnect())
180         .WillOnce(Invoke(on_producer_disconnect));
181 
182     auto on_consumer_disconnect =
183         task_runner_->CreateCheckpoint("on_consumer_disconnect");
184     EXPECT_CALL(consumer_, OnDisconnect())
185         .WillOnce(Invoke(on_consumer_disconnect));
186 
187     svc_.reset();
188     task_runner_->RunUntilCheckpoint("on_producer_disconnect");
189     task_runner_->RunUntilCheckpoint("on_consumer_disconnect");
190 
191     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
192     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
193 
194     task_runner_.reset();
195     kProducerSock.Destroy();
196     kConsumerSock.Destroy();
197   }
198 
GetProducerSMBScrapingMode()199   virtual TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode() {
200     return TracingService::ProducerSMBScrapingMode::kDefault;
201   }
202 
203   std::unique_ptr<base::TestTaskRunner> task_runner_;
204   std::unique_ptr<ServiceIPCHost> svc_;
205   std::unique_ptr<TracingService::ProducerEndpoint> producer_endpoint_;
206   MockProducer producer_;
207   std::unique_ptr<TracingService::ConsumerEndpoint> consumer_endpoint_;
208   MockConsumer consumer_;
209 };
210 
TEST_F(TracingIntegrationTest,WithIPCTransport)211 TEST_F(TracingIntegrationTest, WithIPCTransport) {
212   // Start tracing.
213   TraceConfig trace_config;
214   trace_config.add_buffers()->set_size_kb(4096 * 10);
215   auto* ds_config = trace_config.add_data_sources()->mutable_config();
216   ds_config->set_name("perfetto.test");
217   ds_config->set_target_buffer(0);
218   consumer_endpoint_->EnableTracing(trace_config);
219 
220   // At this point, the Producer should be asked to turn its data source on.
221   DataSourceInstanceID ds_iid = 0;
222 
223   BufferID global_buf_id = 0;
224   auto on_create_ds_instance =
225       task_runner_->CreateCheckpoint("on_create_ds_instance");
226   EXPECT_CALL(producer_, OnTracingSetup());
227 
228   // Store the arguments passed to SetupDataSource() and later check that they
229   // match the ones passed to StartDataSource().
230   DataSourceInstanceID setup_id;
231   DataSourceConfig setup_cfg_proto;
232   EXPECT_CALL(producer_, SetupDataSource(_, _))
233       .WillOnce(
234           Invoke([&setup_id, &setup_cfg_proto](DataSourceInstanceID id,
235                                                const DataSourceConfig& cfg) {
236             setup_id = id;
237             setup_cfg_proto = cfg;
238           }));
239   EXPECT_CALL(producer_, StartDataSource(_, _))
240       .WillOnce(
241           Invoke([on_create_ds_instance, &ds_iid, &global_buf_id, &setup_id,
242                   &setup_cfg_proto](DataSourceInstanceID id,
243                                     const DataSourceConfig& cfg) {
244             // id and config should match the ones passed to SetupDataSource.
245             ASSERT_EQ(id, setup_id);
246             ASSERT_EQ(setup_cfg_proto, cfg);
247             ASSERT_NE(0u, id);
248             ds_iid = id;
249             ASSERT_EQ("perfetto.test", cfg.name());
250             global_buf_id = static_cast<BufferID>(cfg.target_buffer());
251             ASSERT_NE(0u, global_buf_id);
252             ASSERT_LE(global_buf_id, std::numeric_limits<BufferID>::max());
253             on_create_ds_instance();
254           }));
255   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
256 
257   // Now let the data source fill some pages within the same task.
258   // Doing so should accumulate a bunch of chunks that will be notified by the
259   // a future task in one batch.
260   std::unique_ptr<TraceWriter> writer =
261       producer_endpoint_->CreateTraceWriter(global_buf_id);
262   ASSERT_TRUE(writer);
263 
264   const size_t kNumPackets = 10;
265   for (size_t i = 0; i < kNumPackets; i++) {
266     char buf[16];
267     base::SprintfTrunc(buf, sizeof(buf), "evt_%zu", i);
268     writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
269   }
270 
271   // Allow the service to see the CommitData() before reading back.
272   auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
273   writer->Flush(on_data_committed);
274   task_runner_->RunUntilCheckpoint("on_data_committed");
275 
276   // Read the log buffer.
277   consumer_endpoint_->ReadBuffers();
278   size_t num_pack_rx = 0;
279   bool saw_clock_snapshot = false;
280   bool saw_trace_config = false;
281   bool saw_trace_stats = false;
282   auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
283   EXPECT_CALL(consumer_, OnTracePackets(_, _))
284       .WillRepeatedly(
285           Invoke([&num_pack_rx, all_packets_rx, &trace_config,
286                   &saw_clock_snapshot, &saw_trace_config, &saw_trace_stats](
287                      std::vector<TracePacket>* packets, bool has_more) {
288 #if PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
289             const int kExpectedMinNumberOfClocks = 1;
290 #elif PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
291             const int kExpectedMinNumberOfClocks = 2;
292 #else
293             const int kExpectedMinNumberOfClocks = 6;
294 #endif
295 
296             for (auto& encoded_packet : *packets) {
297               protos::gen::TracePacket packet;
298               ASSERT_TRUE(packet.ParseFromString(
299                   encoded_packet.GetRawBytesForTesting()));
300               if (packet.has_for_testing()) {
301                 char buf[8];
302                 base::SprintfTrunc(buf, sizeof(buf), "evt_%zu", num_pack_rx++);
303                 EXPECT_EQ(std::string(buf), packet.for_testing().str());
304               } else if (packet.has_clock_snapshot()) {
305                 EXPECT_GE(packet.clock_snapshot().clocks_size(),
306                           kExpectedMinNumberOfClocks);
307                 saw_clock_snapshot = true;
308               } else if (packet.has_trace_config()) {
309                 EXPECT_EQ(packet.trace_config(), trace_config);
310                 saw_trace_config = true;
311               } else if (packet.has_trace_stats()) {
312                 saw_trace_stats = true;
313                 CheckTraceStats(packet);
314               }
315             }
316             if (!has_more)
317               all_packets_rx();
318           }));
319   task_runner_->RunUntilCheckpoint("all_packets_rx");
320   ASSERT_EQ(kNumPackets, num_pack_rx);
321   EXPECT_TRUE(saw_clock_snapshot);
322   EXPECT_TRUE(saw_trace_config);
323   EXPECT_TRUE(saw_trace_stats);
324 
325   // Disable tracing.
326   consumer_endpoint_->DisableTracing();
327 
328   auto on_tracing_disabled =
329       task_runner_->CreateCheckpoint("on_tracing_disabled");
330   EXPECT_CALL(producer_, StopDataSource(_));
331   EXPECT_CALL(consumer_, OnTracingDisabled(_))
332       .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
333   task_runner_->RunUntilCheckpoint("on_tracing_disabled");
334 }
335 
336 // Regression test for b/172950370.
TEST_F(TracingIntegrationTest,ValidErrorOnDisconnection)337 TEST_F(TracingIntegrationTest, ValidErrorOnDisconnection) {
338   // Start tracing.
339   TraceConfig trace_config;
340   trace_config.add_buffers()->set_size_kb(4096 * 10);
341   auto* ds_config = trace_config.add_data_sources()->mutable_config();
342   ds_config->set_name("perfetto.test");
343   consumer_endpoint_->EnableTracing(trace_config);
344 
345   auto on_create_ds_instance =
346       task_runner_->CreateCheckpoint("on_create_ds_instance");
347   EXPECT_CALL(producer_, OnTracingSetup());
348 
349   // Store the arguments passed to SetupDataSource() and later check that they
350   // match the ones passed to StartDataSource().
351   EXPECT_CALL(producer_, SetupDataSource(_, _));
352   EXPECT_CALL(producer_, StartDataSource(_, _))
353       .WillOnce(InvokeWithoutArgs(on_create_ds_instance));
354   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
355 
356   EXPECT_CALL(consumer_, OnTracingDisabled(_))
357       .WillOnce(Invoke([](const std::string& err) {
358         EXPECT_THAT(err,
359                     testing::HasSubstr("EnableTracing IPC request rejected"));
360       }));
361 
362   // TearDown() will destroy the service via svc_.reset(). That will drop the
363   // connection and trigger the EXPECT_CALL(OnTracingDisabled) above.
364 }
365 
366 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
TEST_F(TracingIntegrationTest,WriteIntoFile)367 TEST_F(TracingIntegrationTest, WriteIntoFile) {
368   // Start tracing.
369   TraceConfig trace_config;
370   trace_config.add_buffers()->set_size_kb(4096 * 10);
371   auto* ds_config = trace_config.add_data_sources()->mutable_config();
372   ds_config->set_name("perfetto.test");
373   ds_config->set_target_buffer(0);
374   trace_config.set_write_into_file(true);
375   base::TempFile tmp_file = base::TempFile::CreateUnlinked();
376   consumer_endpoint_->EnableTracing(trace_config,
377                                     base::ScopedFile(dup(tmp_file.fd())));
378 
379   // At this point, the producer_ should be asked to turn its data source on.
380   BufferID global_buf_id = 0;
381   auto on_create_ds_instance =
382       task_runner_->CreateCheckpoint("on_create_ds_instance");
383   EXPECT_CALL(producer_, OnTracingSetup());
384   EXPECT_CALL(producer_, SetupDataSource(_, _));
385   EXPECT_CALL(producer_, StartDataSource(_, _))
386       .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
387                            DataSourceInstanceID, const DataSourceConfig& cfg) {
388         global_buf_id = static_cast<BufferID>(cfg.target_buffer());
389         on_create_ds_instance();
390       }));
391   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
392 
393   std::unique_ptr<TraceWriter> writer =
394       producer_endpoint_->CreateTraceWriter(global_buf_id);
395   ASSERT_TRUE(writer);
396 
397   const size_t kNumPackets = 10;
398   for (size_t i = 0; i < kNumPackets; i++) {
399     char buf[16];
400     base::SprintfTrunc(buf, sizeof(buf), "evt_%zu", i);
401     writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
402   }
403   auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
404   writer->Flush(on_data_committed);
405   task_runner_->RunUntilCheckpoint("on_data_committed");
406 
407   // Will disable tracing and will force the buffers to be written into the
408   // file before destroying them.
409   consumer_endpoint_->FreeBuffers();
410 
411   auto on_tracing_disabled =
412       task_runner_->CreateCheckpoint("on_tracing_disabled");
413   EXPECT_CALL(producer_, StopDataSource(_));
414   EXPECT_CALL(consumer_, OnTracingDisabled(_))
415       .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
416   task_runner_->RunUntilCheckpoint("on_tracing_disabled");
417 
418   // Check that |tmp_file| contains a valid trace.proto message.
419   ASSERT_EQ(0, lseek(tmp_file.fd(), 0, SEEK_SET));
420   std::string trace_contents;
421   ASSERT_TRUE(base::ReadFileDescriptor(tmp_file.fd(), &trace_contents));
422   protos::gen::Trace tmp_trace;
423   ASSERT_TRUE(tmp_trace.ParseFromString(trace_contents));
424   size_t num_test_packet = 0;
425   size_t num_clock_snapshot_packet = 0;
426   size_t num_system_info_packet = 0;
427   bool saw_trace_stats = false;
428   for (int i = 0; i < tmp_trace.packet_size(); i++) {
429     const auto& packet = tmp_trace.packet()[static_cast<size_t>(i)];
430     if (packet.has_for_testing()) {
431       ASSERT_EQ("evt_" + std::to_string(num_test_packet++),
432                 packet.for_testing().str());
433     } else if (packet.has_trace_stats()) {
434       saw_trace_stats = true;
435       CheckTraceStats(packet);
436     } else if (packet.has_clock_snapshot()) {
437       num_clock_snapshot_packet++;
438     } else if (packet.has_system_info()) {
439       num_system_info_packet++;
440     }
441   }
442   ASSERT_TRUE(saw_trace_stats);
443   ASSERT_GT(num_clock_snapshot_packet, 0u);
444   ASSERT_GT(num_system_info_packet, 0u);
445 }
446 #endif
447 
448 class TracingIntegrationTestWithSMBScrapingProducer
449     : public TracingIntegrationTest {
450  public:
GetProducerSMBScrapingMode()451   TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode()
452       override {
453     return TracingService::ProducerSMBScrapingMode::kEnabled;
454   }
455 };
456 
TEST_F(TracingIntegrationTestWithSMBScrapingProducer,ScrapeOnFlush)457 TEST_F(TracingIntegrationTestWithSMBScrapingProducer, ScrapeOnFlush) {
458   // Start tracing.
459   TraceConfig trace_config;
460   trace_config.add_buffers()->set_size_kb(4096 * 10);
461   auto* ds_config = trace_config.add_data_sources()->mutable_config();
462   ds_config->set_name("perfetto.test");
463   ds_config->set_target_buffer(0);
464   consumer_endpoint_->EnableTracing(trace_config);
465 
466   // At this point, the Producer should be asked to turn its data source on.
467 
468   BufferID global_buf_id = 0;
469   auto on_create_ds_instance =
470       task_runner_->CreateCheckpoint("on_create_ds_instance");
471   EXPECT_CALL(producer_, OnTracingSetup());
472 
473   EXPECT_CALL(producer_, SetupDataSource(_, _));
474   EXPECT_CALL(producer_, StartDataSource(_, _))
475       .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
476                            DataSourceInstanceID, const DataSourceConfig& cfg) {
477         global_buf_id = static_cast<BufferID>(cfg.target_buffer());
478         on_create_ds_instance();
479       }));
480   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
481 
482   // Create writer, which will post a task to register the writer with the
483   // service.
484   std::unique_ptr<TraceWriter> writer =
485       producer_endpoint_->CreateTraceWriter(global_buf_id);
486   ASSERT_TRUE(writer);
487 
488   // Wait for the writer to be registered.
489   task_runner_->RunUntilIdle();
490 
491   // Write a few trace packets.
492   writer->NewTracePacket()->set_for_testing()->set_str("payload1");
493   writer->NewTracePacket()->set_for_testing()->set_str("payload2");
494   writer->NewTracePacket()->set_for_testing()->set_str("payload3");
495 
496   // Ask the service to flush, but don't flush our trace writer. This should
497   // cause our uncommitted SMB chunk to be scraped.
498   auto on_flush_complete = task_runner_->CreateCheckpoint("on_flush_complete");
499   FlushFlags flush_flags(FlushFlags::Initiator::kConsumerSdk,
500                          FlushFlags::Reason::kExplicit);
501   consumer_endpoint_->Flush(
502       5000,
503       [on_flush_complete](bool success) {
504         EXPECT_TRUE(success);
505         on_flush_complete();
506       },
507       flush_flags);
508   EXPECT_CALL(producer_, Flush(_, _, _, flush_flags))
509       .WillOnce(Invoke([this](FlushRequestID flush_req_id,
510                               const DataSourceInstanceID*, size_t, FlushFlags) {
511         producer_endpoint_->NotifyFlushComplete(flush_req_id);
512       }));
513   task_runner_->RunUntilCheckpoint("on_flush_complete");
514 
515   // Read the log buffer. We should see all the packets.
516   consumer_endpoint_->ReadBuffers();
517 
518   size_t num_test_pack_rx = 0;
519   auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
520   EXPECT_CALL(consumer_, OnTracePackets(_, _))
521       .WillRepeatedly(
522           Invoke([&num_test_pack_rx, all_packets_rx](
523                      std::vector<TracePacket>* packets, bool has_more) {
524             for (auto& encoded_packet : *packets) {
525               protos::gen::TracePacket packet;
526               ASSERT_TRUE(packet.ParseFromString(
527                   encoded_packet.GetRawBytesForTesting()));
528               if (packet.has_for_testing()) {
529                 num_test_pack_rx++;
530               }
531             }
532             if (!has_more)
533               all_packets_rx();
534           }));
535   task_runner_->RunUntilCheckpoint("all_packets_rx");
536   ASSERT_EQ(3u, num_test_pack_rx);
537 
538   // Disable tracing.
539   consumer_endpoint_->DisableTracing();
540 
541   auto on_tracing_disabled =
542       task_runner_->CreateCheckpoint("on_tracing_disabled");
543   auto on_stop_ds = task_runner_->CreateCheckpoint("on_stop_ds");
544   EXPECT_CALL(producer_, StopDataSource(_))
545       .WillOnce(InvokeWithoutArgs(on_stop_ds));
546   EXPECT_CALL(consumer_, OnTracingDisabled(_))
547       .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
548   task_runner_->RunUntilCheckpoint("on_stop_ds");
549   task_runner_->RunUntilCheckpoint("on_tracing_disabled");
550 }
551 
552 // TODO(primiano): add tests to cover:
553 // - unknown fields preserved end-to-end.
554 // - >1 data source.
555 // - >1 data consumer sharing the same data source, with different TraceBuffers.
556 // - >1 consumer with > 1 buffer each.
557 // - Consumer disconnecting in the middle of a ReadBuffers() call.
558 // - Multiple calls to DisableTracing.
559 // - Out of order Enable/Disable/FreeBuffers calls.
560 // - DisableTracing does actually freeze the buffers.
561 
562 }  // namespace perfetto
563