xref: /aosp_15_r20/external/perfetto/src/tracing/core/trace_writer_impl_unittest.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/trace_writer_impl.h"
18 
19 #include <vector>
20 
21 #include "perfetto/ext/base/utils.h"
22 #include "perfetto/ext/tracing/core/commit_data_request.h"
23 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
24 #include "perfetto/ext/tracing/core/trace_writer.h"
25 #include "perfetto/ext/tracing/core/tracing_service.h"
26 #include "perfetto/protozero/message.h"
27 #include "perfetto/protozero/proto_utils.h"
28 #include "perfetto/protozero/scattered_stream_writer.h"
29 #include "src/base/test/test_task_runner.h"
30 #include "src/tracing/core/shared_memory_arbiter_impl.h"
31 #include "src/tracing/test/aligned_buffer_test.h"
32 #include "src/tracing/test/mock_producer_endpoint.h"
33 #include "test/gtest_and_gmock.h"
34 
35 #include "protos/perfetto/trace/test_event.gen.h"
36 #include "protos/perfetto/trace/test_event.pbzero.h"
37 #include "protos/perfetto/trace/trace_packet.gen.h"
38 #include "protos/perfetto/trace/trace_packet.pbzero.h"
39 
40 namespace perfetto {
41 namespace {
42 
43 using ChunkHeader = SharedMemoryABI::ChunkHeader;
44 using ShmemMode = SharedMemoryABI::ShmemMode;
45 using ::protozero::ScatteredStreamWriter;
46 using ::testing::AllOf;
47 using ::testing::ElementsAre;
48 using ::testing::IsEmpty;
49 using ::testing::IsNull;
50 using ::testing::MockFunction;
51 using ::testing::Ne;
52 using ::testing::NiceMock;
53 using ::testing::Not;
54 using ::testing::NotNull;
55 using ::testing::Optional;
56 using ::testing::SizeIs;
57 using ::testing::ValuesIn;
58 
59 class TraceWriterImplTest : public AlignedBufferTest {
60  public:
61   struct PatchKey {
62     uint32_t writer_id;
63     uint32_t chunk_id;
operator <perfetto::__anon372643340111::TraceWriterImplTest::PatchKey64     bool operator<(const PatchKey& other) const {
65       return std::tie(writer_id, chunk_id) <
66              std::tie(other.writer_id, other.chunk_id);
67     }
68   };
SetUp()69   void SetUp() override {
70     default_layout_ =
71         SharedMemoryArbiterImpl::default_page_layout_for_testing();
72     SharedMemoryArbiterImpl::set_default_layout_for_testing(
73         SharedMemoryABI::PageLayout::kPageDiv4);
74     AlignedBufferTest::SetUp();
75     task_runner_.reset(new base::TestTaskRunner());
76     arbiter_.reset(new SharedMemoryArbiterImpl(
77         buf(), buf_size(), ShmemMode::kDefault, page_size(),
78         &mock_producer_endpoint_, task_runner_.get()));
79     ON_CALL(mock_producer_endpoint_, CommitData)
80         .WillByDefault([&](const CommitDataRequest& req,
81                            MockProducerEndpoint::CommitDataCallback cb) {
82           last_commit_ = req;
83           last_commit_callback_ = cb;
84           for (const CommitDataRequest::ChunkToPatch& c :
85                req.chunks_to_patch()) {
86             patches_[PatchKey{c.writer_id(), c.chunk_id()}] = c.patches();
87           }
88         });
89   }
90 
TearDown()91   void TearDown() override {
92     arbiter_.reset();
93     task_runner_.reset();
94     SharedMemoryArbiterImpl::set_default_layout_for_testing(default_layout_);
95   }
96 
CopyPayloadAndApplyPatches(SharedMemoryABI::Chunk & chunk) const97   std::vector<uint8_t> CopyPayloadAndApplyPatches(
98       SharedMemoryABI::Chunk& chunk) const {
99     std::vector<uint8_t> copy(chunk.payload_begin(),
100                               chunk.payload_begin() + chunk.payload_size());
101     ChunkHeader::Packets p = chunk.header()->packets.load();
102 
103     auto it = patches_.find(PatchKey{chunk.header()->writer_id.load(),
104                                      chunk.header()->chunk_id.load()});
105     if (it == patches_.end()) {
106       EXPECT_FALSE(p.flags & ChunkHeader::kChunkNeedsPatching);
107       return copy;
108     }
109     EXPECT_TRUE(p.flags & ChunkHeader::kChunkNeedsPatching);
110 
111     for (const CommitDataRequest::ChunkToPatch::Patch& patch : it->second) {
112       if (patch.offset() + patch.data().size() > copy.size()) {
113         ADD_FAILURE() << "Patch out of bounds";
114         continue;
115       }
116       for (size_t i = 0; i < patch.data().size(); i++) {
117         copy[patch.offset() + i] =
118             reinterpret_cast<const uint8_t*>(patch.data().data())[i];
119       }
120     }
121     return copy;
122   }
123 
124   // Extracts trace packets from the shared memory buffer, and returns copies of
125   // them (after applying the patches received). The producer that writes to the
126   // shared memory (i.e. the trace writer) must be destroyed.
GetPacketsFromShmemAndPatches()127   std::vector<std::string> GetPacketsFromShmemAndPatches() {
128     std::vector<std::string> packets;
129     SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
130     bool was_fragmenting = false;
131     for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
132       uint32_t header_bitmap = abi->GetPageHeaderBitmap(page_idx);
133       size_t num_chunks =
134           SharedMemoryABI::GetNumChunksFromHeaderBitmap(header_bitmap);
135       for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
136         SharedMemoryABI::ChunkState chunk_state =
137             abi->GetChunkState(page_idx, chunk_idx);
138         if (chunk_state != SharedMemoryABI::kChunkFree &&
139             chunk_state != SharedMemoryABI::kChunkComplete) {
140           ADD_FAILURE() << "Page " << page_idx << " chunk " << chunk_idx
141                         << " unexpected state: " << chunk_state;
142           continue;
143         }
144         SharedMemoryABI::Chunk chunk =
145             abi->TryAcquireChunkForReading(page_idx, chunk_idx);
146         if (!chunk.is_valid())
147           continue;
148         ChunkHeader::Packets p = chunk.header()->packets.load();
149 
150         EXPECT_EQ(
151             was_fragmenting,
152             static_cast<bool>(p.flags &
153                               ChunkHeader::kFirstPacketContinuesFromPrevChunk));
154 
155         std::vector<uint8_t> payload = CopyPayloadAndApplyPatches(chunk);
156 
157         const uint8_t* read_ptr = payload.data();
158         const uint8_t* const end_read_ptr = payload.data() + payload.size();
159 
160         size_t num_fragments = p.count;
161         for (; num_fragments && read_ptr < end_read_ptr; num_fragments--) {
162           uint64_t len;
163           read_ptr =
164               protozero::proto_utils::ParseVarInt(read_ptr, end_read_ptr, &len);
165           if (!was_fragmenting || packets.empty()) {
166             packets.push_back(std::string());
167           }
168           was_fragmenting = false;
169           if (read_ptr + len > end_read_ptr) {
170             ADD_FAILURE() << "Page " << page_idx << " chunk " << chunk_idx
171                           << " malformed chunk";
172           }
173           packets.back().append(reinterpret_cast<const char*>(read_ptr),
174                                 static_cast<size_t>(len));
175           read_ptr += len;
176         }
177         EXPECT_EQ(num_fragments, 0u);
178         was_fragmenting =
179             p.flags & ChunkHeader::kLastPacketContinuesOnNextChunk;
180       }
181     }
182     // Ignore empty packets (like tracing service does).
183     packets.erase(
184         std::remove_if(packets.begin(), packets.end(),
185                        [](const std::string& p) { return p.empty(); }),
186         packets.end());
187     return packets;
188   }
189 
190   struct ChunkInABI {
191     size_t page_idx;
192     uint32_t header_bitmap;
193     size_t chunk_idx;
194   };
GetFirstChunkBeingWritten()195   std::optional<ChunkInABI> GetFirstChunkBeingWritten() {
196     SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
197     for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
198       uint32_t header_bitmap = abi->GetPageHeaderBitmap(page_idx);
199       size_t num_chunks =
200           SharedMemoryABI::GetNumChunksFromHeaderBitmap(header_bitmap);
201       for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
202         SharedMemoryABI::ChunkState chunk_state =
203             abi->GetChunkState(page_idx, chunk_idx);
204         if (chunk_state != SharedMemoryABI::kChunkBeingWritten) {
205           continue;
206         }
207         return ChunkInABI{page_idx, header_bitmap, chunk_idx};
208       }
209     }
210     return std::nullopt;
211   }
212 
GetChunkFragments(size_t packets_count,const void * chunk_payload,size_t chunk_payload_size)213   static std::optional<std::vector<std::string>> GetChunkFragments(
214       size_t packets_count,
215       const void* chunk_payload,
216       size_t chunk_payload_size) {
217     std::vector<std::string> fragments;
218     const uint8_t* read_ptr = static_cast<const uint8_t*>(chunk_payload);
219     const uint8_t* const end_read_ptr = read_ptr + chunk_payload_size;
220 
221     for (size_t num_fragments = packets_count;
222          num_fragments && read_ptr < end_read_ptr; num_fragments--) {
223       uint64_t len;
224       read_ptr =
225           protozero::proto_utils::ParseVarInt(read_ptr, end_read_ptr, &len);
226       if (read_ptr + len > end_read_ptr) {
227         return std::nullopt;
228       }
229       fragments.push_back(std::string(reinterpret_cast<const char*>(read_ptr),
230                                       static_cast<size_t>(len)));
231       read_ptr += len;
232     }
233     return std::make_optional(std::move(fragments));
234   }
235 
236   SharedMemoryABI::PageLayout default_layout_;
237   CommitDataRequest last_commit_;
238   ProducerEndpoint::CommitDataCallback last_commit_callback_;
239   std::map<PatchKey, std::vector<CommitDataRequest::ChunkToPatch::Patch>>
240       patches_;
241   NiceMock<MockProducerEndpoint> mock_producer_endpoint_;
242 
243   std::unique_ptr<base::TestTaskRunner> task_runner_;
244   std::unique_ptr<SharedMemoryArbiterImpl> arbiter_;
245 };
246 
247 size_t const kPageSizes[] = {4096, 65536};
248 INSTANTIATE_TEST_SUITE_P(PageSize, TraceWriterImplTest, ValuesIn(kPageSizes));
249 
TEST_P(TraceWriterImplTest,NewTracePacket)250 TEST_P(TraceWriterImplTest, NewTracePacket) {
251   const BufferID kBufId = 42;
252   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
253   const size_t kNumPackets = 32;
254   for (size_t i = 0; i < kNumPackets; i++) {
255     auto packet = writer->NewTracePacket();
256     packet->set_for_testing()->set_str(
257         std::string("foobar " + std::to_string(i)));
258   }
259 
260   // Destroying the TraceWriteImpl should cause the last packet to be finalized
261   // and the chunk to be put back in the kChunkComplete state.
262   writer.reset();
263 
264   std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
265   ASSERT_THAT(packets, SizeIs(kNumPackets));
266   for (size_t i = 0; i < kNumPackets; i++) {
267     protos::gen::TracePacket packet;
268     EXPECT_TRUE(packet.ParseFromString(packets[i]));
269     EXPECT_EQ(packet.for_testing().str(), "foobar " + std::to_string(i));
270     if (i == 0) {
271       EXPECT_TRUE(packet.first_packet_on_sequence());
272     } else {
273       EXPECT_FALSE(packet.first_packet_on_sequence());
274     }
275   }
276 }
277 
TEST_P(TraceWriterImplTest,NewTracePacketLargePackets)278 TEST_P(TraceWriterImplTest, NewTracePacketLargePackets) {
279   const BufferID kBufId = 42;
280   const size_t chunk_size = page_size() / 4;
281   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
282   {
283     auto packet = writer->NewTracePacket();
284     packet->set_for_testing()->set_str(std::string("PACKET_1") +
285                                        std::string(chunk_size, 'x'));
286   }
287   {
288     auto packet = writer->NewTracePacket();
289     packet->set_for_testing()->set_str(std::string("PACKET_2") +
290                                        std::string(chunk_size, 'x'));
291   }
292 
293   // Destroying the TraceWriteImpl should cause the last packet to be finalized
294   // and the chunk to be put back in the kChunkComplete state.
295   writer.reset();
296 
297   std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
298   ASSERT_THAT(packets, SizeIs(2));
299   {
300     protos::gen::TracePacket packet;
301     EXPECT_TRUE(packet.ParseFromString(packets[0]));
302     EXPECT_EQ(packet.for_testing().str(),
303               std::string("PACKET_1") + std::string(chunk_size, 'x'));
304   }
305   {
306     protos::gen::TracePacket packet;
307     EXPECT_TRUE(packet.ParseFromString(packets[1]));
308     EXPECT_EQ(packet.for_testing().str(),
309               std::string("PACKET_2") + std::string(chunk_size, 'x'));
310   }
311 }
312 
313 // A prefix corresponding to first_packet_on_sequence = true in a serialized
314 // TracePacket proto.
315 constexpr char kFirstPacketOnSequenceFlagPrefix[] = {static_cast<char>(0xB8),
316                                                      0x5, 0x1, 0x0};
317 
TEST_P(TraceWriterImplTest,NewTracePacketTakeWriter)318 TEST_P(TraceWriterImplTest, NewTracePacketTakeWriter) {
319   const BufferID kBufId = 42;
320   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
321   const size_t kNumPackets = 32;
322   for (size_t i = 0; i < kNumPackets; i++) {
323     ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
324     std::string raw_proto_bytes =
325         std::string("RAW_PROTO_BYTES_") + std::to_string(i);
326     sw->WriteBytes(reinterpret_cast<const uint8_t*>(raw_proto_bytes.data()),
327                    raw_proto_bytes.size());
328     writer->FinishTracePacket();
329   }
330 
331   // Destroying the TraceWriteImpl should cause the last packet to be finalized
332   // and the chunk to be put back in the kChunkComplete state.
333   writer.reset();
334 
335   std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
336   ASSERT_THAT(packets, SizeIs(kNumPackets));
337   for (size_t i = 0; i < kNumPackets; i++) {
338     std::string expected = "RAW_PROTO_BYTES_" + std::to_string(i);
339     if (i == 0) {
340       expected = kFirstPacketOnSequenceFlagPrefix + expected;
341     }
342     EXPECT_EQ(packets[i], expected);
343   }
344 }
345 
346 #if defined(GTEST_HAS_DEATH_TEST)
347 using TraceWriterImplDeathTest = TraceWriterImplTest;
348 INSTANTIATE_TEST_SUITE_P(PageSize,
349                          TraceWriterImplDeathTest,
350                          ValuesIn(kPageSizes));
351 
TEST_P(TraceWriterImplDeathTest,NewTracePacketTakeWriterNoFinish)352 TEST_P(TraceWriterImplDeathTest, NewTracePacketTakeWriterNoFinish) {
353   const BufferID kBufId = 42;
354   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
355 
356   TraceWriterImpl::TracePacketHandle handle = writer->NewTracePacket();
357 
358   // Avoid a secondary DCHECK failure from ~TraceWriterImpl() =>
359   // Message::Finalize() due to the stream writer being modified behind the
360   // Message's back. This turns the Finalize() call into a no-op.
361   handle->set_size_field(nullptr);
362 
363   ScatteredStreamWriter* sw = handle.TakeStreamWriter();
364   std::string raw_proto_bytes = std::string("RAW_PROTO_BYTES");
365   sw->WriteBytes(reinterpret_cast<const uint8_t*>(raw_proto_bytes.data()),
366                  raw_proto_bytes.size());
367 
368   EXPECT_DEATH({ writer->NewTracePacket(); }, "");
369 }
370 #endif  // defined(GTEST_HAS_DEATH_TEST)
371 
TEST_P(TraceWriterImplTest,AnnotatePatch)372 TEST_P(TraceWriterImplTest, AnnotatePatch) {
373   const BufferID kBufId = 42;
374   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
375   ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
376   std::string raw_proto_bytes = std::string("RAW_PROTO_BYTES");
377   sw->WriteBytes(reinterpret_cast<const uint8_t*>(raw_proto_bytes.data()),
378                  raw_proto_bytes.size());
379 
380   uint8_t* patch1 =
381       sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
382   ASSERT_THAT(patch1, NotNull());
383   patch1[0] = 0;
384   patch1[1] = 0;
385   patch1[2] = 0;
386   patch1[3] = 0;
387   const uint8_t* old_chunk_pointer = patch1;
388   patch1 = sw->AnnotatePatch(patch1);
389   EXPECT_NE(patch1, old_chunk_pointer);
390   ASSERT_THAT(patch1, NotNull());
391 
392   sw->WriteByte('X');
393 
394   uint8_t* patch2 =
395       sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
396   ASSERT_THAT(patch2, NotNull());
397   patch2[0] = 0;
398   patch2[1] = 0;
399   patch2[2] = 0;
400   patch2[3] = 0;
401   old_chunk_pointer = patch2;
402   patch2 = sw->AnnotatePatch(patch2);
403   EXPECT_NE(patch2, old_chunk_pointer);
404   ASSERT_THAT(patch2, NotNull());
405 
406   const size_t chunk_size = page_size() / 4;
407   std::string large_string(chunk_size, 'x');
408 
409   sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
410                  large_string.size());
411 
412   uint8_t* patch3 =
413       sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
414   ASSERT_THAT(patch3, NotNull());
415   patch3[0] = 0;
416   patch3[1] = 0;
417   patch3[2] = 0;
418   patch3[3] = 0;
419   old_chunk_pointer = patch3;
420   patch3 = sw->AnnotatePatch(patch3);
421   EXPECT_NE(patch3, old_chunk_pointer);
422   ASSERT_THAT(patch3, NotNull());
423 
424   sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
425                  large_string.size());
426 
427   patch1[0] = 0x11;
428   patch1[1] = 0x11;
429   patch1[2] = 0x11;
430   patch1[3] = 0x11;
431 
432   patch2[0] = 0x22;
433   patch2[1] = 0x22;
434   patch2[2] = 0x22;
435   patch2[3] = 0x22;
436 
437   patch3[0] = 0x33;
438   patch3[1] = 0x33;
439   patch3[2] = 0x33;
440   patch3[3] = 0x33;
441 
442   writer->FinishTracePacket();
443 
444   // Destroying the TraceWriteImpl should cause the last packet to be finalized
445   // and the chunk to be put back in the kChunkComplete state.
446   writer.reset();
447 
448   std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
449   EXPECT_THAT(
450       packets,
451       ElementsAre(
452           kFirstPacketOnSequenceFlagPrefix + std::string("RAW_PROTO_BYTES") +
453           std::string("\x11\x11\x11\x11") + std::string("X") +
454           std::string("\x22\x22\x22\x22") + std::string(chunk_size, 'x') +
455           std::string("\x33\x33\x33\x33") + std::string(chunk_size, 'x')));
456 }
457 
TEST_P(TraceWriterImplTest,MixManualTakeAndMessage)458 TEST_P(TraceWriterImplTest, MixManualTakeAndMessage) {
459   const BufferID kBufId = 42;
460   const size_t chunk_size = page_size() / 4;
461   const std::string large_string(chunk_size, 'x');
462 
463   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
464 
465   {
466     ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
467     std::string packet1 = std::string("PACKET_1_");
468     sw->WriteBytes(reinterpret_cast<const uint8_t*>(packet1.data()),
469                    packet1.size());
470     uint8_t* patch =
471         sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
472     ASSERT_THAT(patch, NotNull());
473     patch[0] = 0;
474     patch[1] = 0;
475     patch[2] = 0;
476     patch[3] = 0;
477     const uint8_t* old_chunk_pointer = patch;
478     patch = sw->AnnotatePatch(patch);
479     EXPECT_NE(patch, old_chunk_pointer);
480     ASSERT_THAT(patch, NotNull());
481     sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
482                    large_string.size());
483     patch[0] = 0xFF;
484     patch[1] = 0xFF;
485     patch[2] = 0xFF;
486     patch[3] = 0xFF;
487     writer->FinishTracePacket();
488   }
489 
490   {
491     auto msg = writer->NewTracePacket();
492     std::string packet2 = std::string("PACKET_2_");
493     msg->AppendRawProtoBytes(packet2.data(), packet2.size());
494     auto* nested = msg->BeginNestedMessage<protozero::Message>(1);
495     nested->AppendRawProtoBytes(large_string.data(), large_string.size());
496   }
497 
498   {
499     ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
500     std::string packet3 = std::string("PACKET_3_");
501     sw->WriteBytes(reinterpret_cast<const uint8_t*>(packet3.data()),
502                    packet3.size());
503     uint8_t* patch =
504         sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
505     ASSERT_THAT(patch, NotNull());
506     patch[0] = 0;
507     patch[1] = 0;
508     patch[2] = 0;
509     patch[3] = 0;
510     const uint8_t* old_chunk_pointer = patch;
511     patch = sw->AnnotatePatch(patch);
512     EXPECT_NE(patch, old_chunk_pointer);
513     ASSERT_THAT(patch, NotNull());
514     sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
515                    large_string.size());
516     patch[0] = 0xFF;
517     patch[1] = 0xFF;
518     patch[2] = 0xFF;
519     patch[3] = 0xFF;
520     writer->FinishTracePacket();
521   }
522 
523   // Destroying the TraceWriteImpl should cause the last packet to be finalized
524   // and the chunk to be put back in the kChunkComplete state.
525   writer.reset();
526 
527   uint8_t buf[protozero::proto_utils::kMessageLengthFieldSize];
528   protozero::proto_utils::WriteRedundantVarInt(
529       static_cast<uint32_t>(large_string.size()), buf,
530       protozero::proto_utils::kMessageLengthFieldSize);
531   std::string encoded_size(reinterpret_cast<char*>(buf), sizeof(buf));
532 
533   std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
534   EXPECT_THAT(
535       packets,
536       ElementsAre(kFirstPacketOnSequenceFlagPrefix + std::string("PACKET_1_") +
537                       std::string("\xFF\xFF\xFF\xFF") +
538                       std::string(chunk_size, 'x'),
539                   std::string("PACKET_2_") + std::string("\x0A") +
540                       encoded_size + std::string(chunk_size, 'x'),
541                   std::string("PACKET_3_") + std::string("\xFF\xFF\xFF\xFF") +
542                       std::string(chunk_size, 'x')));
543 }
544 
TEST_P(TraceWriterImplTest,MessageHandleDestroyedPacketScrapable)545 TEST_P(TraceWriterImplTest, MessageHandleDestroyedPacketScrapable) {
546   const BufferID kBufId = 42;
547 
548   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
549 
550   auto packet = writer->NewTracePacket();
551   packet->set_for_testing()->set_str("packet1");
552 
553   std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
554   ASSERT_TRUE(chunk_in_abi.has_value());
555 
556   auto* abi = arbiter_->shmem_abi_for_testing();
557   SharedMemoryABI::Chunk chunk = abi->GetChunkUnchecked(
558       chunk_in_abi->page_idx, chunk_in_abi->header_bitmap,
559       chunk_in_abi->chunk_idx);
560   ASSERT_TRUE(chunk.is_valid());
561 
562   EXPECT_EQ(chunk.header()->packets.load().count, 1);
563   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
564             SharedMemoryABI::ChunkState::kChunkBeingWritten);
565 
566   packet = protozero::MessageHandle<protos::pbzero::TracePacket>();
567 
568   // After destroying the message handle, the chunk header should have an
569   // inflated packet count.
570   EXPECT_EQ(chunk.header()->packets.load().count, 2);
571   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
572             SharedMemoryABI::ChunkState::kChunkBeingWritten);
573 
574   writer.reset();
575 
576   EXPECT_EQ(chunk.header()->packets.load().count, 2);
577   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
578             SharedMemoryABI::ChunkState::kChunkComplete);
579   EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
580               Optional(ElementsAre(Not(IsEmpty()))));
581 }
582 
TEST_P(TraceWriterImplTest,FinishTracePacketScrapable)583 TEST_P(TraceWriterImplTest, FinishTracePacketScrapable) {
584   const BufferID kBufId = 42;
585 
586   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
587 
588   {
589     protos::pbzero::TestEvent test_event;
590     protozero::MessageArena arena;
591     ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
592     uint8_t data[protozero::proto_utils::kMaxTagEncodedSize];
593     uint8_t* data_end = protozero::proto_utils::WriteVarInt(
594         protozero::proto_utils::MakeTagLengthDelimited(
595             protos::pbzero::TracePacket::kForTestingFieldNumber),
596         data);
597     sw->WriteBytes(data, static_cast<size_t>(data_end - data));
598     test_event.Reset(sw, &arena);
599     test_event.set_size_field(
600         sw->ReserveBytes(protozero::proto_utils::kMessageLengthFieldSize));
601     test_event.set_str("payload1");
602   }
603 
604   std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
605   ASSERT_TRUE(chunk_in_abi.has_value());
606 
607   auto* abi = arbiter_->shmem_abi_for_testing();
608   SharedMemoryABI::Chunk chunk = abi->GetChunkUnchecked(
609       chunk_in_abi->page_idx, chunk_in_abi->header_bitmap,
610       chunk_in_abi->chunk_idx);
611   ASSERT_TRUE(chunk.is_valid());
612 
613   EXPECT_EQ(chunk.header()->packets.load().count, 1);
614   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
615             SharedMemoryABI::ChunkState::kChunkBeingWritten);
616 
617   writer->FinishTracePacket();
618 
619   // After a call to FinishTracePacket, the chunk header should have an inflated
620   // packet count.
621   EXPECT_EQ(chunk.header()->packets.load().count, 2);
622   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
623             SharedMemoryABI::ChunkState::kChunkBeingWritten);
624   EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
625               Optional(ElementsAre(Not(IsEmpty()))));
626 
627   // An extra call to FinishTracePacket should have no effect.
628   EXPECT_EQ(chunk.header()->packets.load().count, 2);
629   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
630             SharedMemoryABI::ChunkState::kChunkBeingWritten);
631   EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
632               Optional(ElementsAre(Not(IsEmpty()))));
633 
634   writer.reset();
635 
636   EXPECT_EQ(chunk.header()->packets.load().count, 2);
637   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
638             SharedMemoryABI::ChunkState::kChunkComplete);
639   EXPECT_THAT(GetChunkFragments(2, chunk.payload_begin(), chunk.payload_size()),
640               Optional(ElementsAre(Not(IsEmpty()), IsEmpty())));
641 }
642 
TEST_P(TraceWriterImplTest,MessageHandleDestroyedAndFinishTracePacketScrapable)643 TEST_P(TraceWriterImplTest,
644        MessageHandleDestroyedAndFinishTracePacketScrapable) {
645   const BufferID kBufId = 42;
646 
647   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
648 
649   auto packet = writer->NewTracePacket();
650   packet->set_for_testing()->set_str("packet1");
651 
652   std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
653   ASSERT_TRUE(chunk_in_abi.has_value());
654 
655   auto* abi = arbiter_->shmem_abi_for_testing();
656   SharedMemoryABI::Chunk chunk = abi->GetChunkUnchecked(
657       chunk_in_abi->page_idx, chunk_in_abi->header_bitmap,
658       chunk_in_abi->chunk_idx);
659   ASSERT_TRUE(chunk.is_valid());
660 
661   EXPECT_EQ(chunk.header()->packets.load().count, 1);
662   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
663             SharedMemoryABI::ChunkState::kChunkBeingWritten);
664   packet = protozero::MessageHandle<protos::pbzero::TracePacket>();
665 
666   // After destroying the message handle, the chunk header should have an
667   // inflated packet count.
668   EXPECT_EQ(chunk.header()->packets.load().count, 2);
669   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
670             SharedMemoryABI::ChunkState::kChunkBeingWritten);
671 
672   writer->FinishTracePacket();
673 
674   // An extra call to FinishTracePacket should have no effect.
675   EXPECT_EQ(chunk.header()->packets.load().count, 2);
676   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
677             SharedMemoryABI::ChunkState::kChunkBeingWritten);
678   EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
679               Optional(ElementsAre(Not(IsEmpty()))));
680 
681   writer.reset();
682 
683   EXPECT_EQ(chunk.header()->packets.load().count, 2);
684   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
685             SharedMemoryABI::ChunkState::kChunkComplete);
686   EXPECT_THAT(GetChunkFragments(2, chunk.payload_begin(), chunk.payload_size()),
687               Optional(ElementsAre(Not(IsEmpty()), IsEmpty())));
688 }
689 
TEST_P(TraceWriterImplTest,MessageHandleDestroyedPacketFullChunk)690 TEST_P(TraceWriterImplTest, MessageHandleDestroyedPacketFullChunk) {
691   const BufferID kBufId = 42;
692 
693   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
694 
695   auto packet = writer->NewTracePacket();
696   protos::pbzero::TestEvent* test_event = packet->set_for_testing();
697   std::string chunk_filler(test_event->stream_writer()->bytes_available(),
698                            '\0');
699   test_event->AppendRawProtoBytes(chunk_filler.data(), chunk_filler.size());
700 
701   std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
702   ASSERT_TRUE(chunk_in_abi.has_value());
703 
704   auto* abi = arbiter_->shmem_abi_for_testing();
705   SharedMemoryABI::Chunk chunk = abi->GetChunkUnchecked(
706       chunk_in_abi->page_idx, chunk_in_abi->header_bitmap,
707       chunk_in_abi->chunk_idx);
708   ASSERT_TRUE(chunk.is_valid());
709 
710   EXPECT_EQ(chunk.header()->packets.load().count, 1);
711   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
712             SharedMemoryABI::ChunkState::kChunkBeingWritten);
713   // Finish the TracePacket: since there's no space for an empty packet, the
714   // trace writer should immediately mark the chunk as completed.
715   packet = protozero::MessageHandle<protos::pbzero::TracePacket>();
716 
717   EXPECT_EQ(chunk.header()->packets.load().count, 1);
718   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
719             SharedMemoryABI::ChunkState::kChunkComplete);
720 
721   writer.reset();
722 
723   EXPECT_EQ(chunk.header()->packets.load().count, 1);
724   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
725             SharedMemoryABI::ChunkState::kChunkComplete);
726 }
727 
TEST_P(TraceWriterImplTest,FinishTracePacketFullChunk)728 TEST_P(TraceWriterImplTest, FinishTracePacketFullChunk) {
729   const BufferID kBufId = 42;
730 
731   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
732 
733   {
734     protos::pbzero::TestEvent test_event;
735     protozero::MessageArena arena;
736     ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
737     uint8_t data[protozero::proto_utils::kMaxTagEncodedSize];
738     uint8_t* data_end = protozero::proto_utils::WriteVarInt(
739         protozero::proto_utils::MakeTagLengthDelimited(
740             protos::pbzero::TracePacket::kForTestingFieldNumber),
741         data);
742     sw->WriteBytes(data, static_cast<size_t>(data_end - data));
743     test_event.Reset(sw, &arena);
744     test_event.set_size_field(
745         sw->ReserveBytes(protozero::proto_utils::kMessageLengthFieldSize));
746     std::string chunk_filler(sw->bytes_available(), '\0');
747     test_event.AppendRawProtoBytes(chunk_filler.data(), chunk_filler.size());
748   }
749 
750   std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
751   ASSERT_TRUE(chunk_in_abi.has_value());
752 
753   auto* abi = arbiter_->shmem_abi_for_testing();
754   SharedMemoryABI::Chunk chunk = abi->GetChunkUnchecked(
755       chunk_in_abi->page_idx, chunk_in_abi->header_bitmap,
756       chunk_in_abi->chunk_idx);
757   ASSERT_TRUE(chunk.is_valid());
758 
759   EXPECT_EQ(chunk.header()->packets.load().count, 1);
760   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
761             SharedMemoryABI::ChunkState::kChunkBeingWritten);
762 
763   // Finish the TracePacket: since there's no space for an empty packet, the
764   // trace writer should immediately mark the chunk as completed, instead of
765   // inflating the count.
766   writer->FinishTracePacket();
767 
768   EXPECT_EQ(chunk.header()->packets.load().count, 1);
769   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
770             SharedMemoryABI::ChunkState::kChunkComplete);
771 
772   writer.reset();
773 
774   EXPECT_EQ(chunk.header()->packets.load().count, 1);
775   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
776             SharedMemoryABI::ChunkState::kChunkComplete);
777 }
778 
TEST_P(TraceWriterImplTest,FragmentingPacketWithProducerAndServicePatching)779 TEST_P(TraceWriterImplTest, FragmentingPacketWithProducerAndServicePatching) {
780   const BufferID kBufId = 42;
781   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
782 
783   // Write a packet that's guaranteed to span more than a single chunk, but
784   // less than two chunks.
785   auto packet = writer->NewTracePacket();
786   size_t chunk_size = page_size() / 4;
787   std::string large_string(chunk_size, 'x');
788   packet->set_for_testing()->set_str(large_string);
789 
790   // First chunk should be committed.
791   arbiter_->FlushPendingCommitDataRequests();
792   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
793   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
794   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
795   EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
796   EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
797 
798   // We will simulate a batching cycle by first setting the batching period to
799   // a very large value and then force-flushing when we are done writing data.
800   arbiter_->SetDirectSMBPatchingSupportedByService();
801   ASSERT_TRUE(arbiter_->EnableDirectSMBPatching());
802   arbiter_->SetBatchCommitsDuration(UINT32_MAX);
803 
804   // Write a second packet that's guaranteed to span more than a single chunk.
805   // Starting a new trace packet should cause the patches for the first packet
806   // (i.e. for the first chunk) to be queued for sending to the service. They
807   // cannot be applied locally because the first chunk was already committed.
808   packet->Finalize();
809   auto packet2 = writer->NewTracePacket();
810   packet2->set_for_testing()->set_str(large_string);
811 
812   // Starting a new packet yet again should cause the patches for the second
813   // packet (i.e. for the second chunk) to be applied in the producer, because
814   // the second chunk has not been committed yet.
815   packet2->Finalize();
816   auto packet3 = writer->NewTracePacket();
817 
818   // Simulate the end of the batching period, which should trigger a commit to
819   // the service.
820   arbiter_->FlushPendingCommitDataRequests();
821 
822   SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
823 
824   // The first allocated chunk should be complete but need patching, since the
825   // packet extended past the chunk and no patches for the packet size or
826   // string field size were applied yet.
827   ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
828   auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
829   ASSERT_TRUE(chunk.is_valid());
830   EXPECT_EQ(chunk.header()->packets.load().count, 1u);
831   EXPECT_TRUE(chunk.header()->packets.load().flags &
832               SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
833   EXPECT_TRUE(chunk.header()->packets.load().flags &
834               SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
835 
836   // Verify that a patch for the first chunk was sent to the service.
837   ASSERT_THAT(last_commit_.chunks_to_patch(), SizeIs(1));
838   EXPECT_EQ(last_commit_.chunks_to_patch()[0].writer_id(), writer->writer_id());
839   EXPECT_EQ(last_commit_.chunks_to_patch()[0].target_buffer(), kBufId);
840   EXPECT_EQ(last_commit_.chunks_to_patch()[0].chunk_id(),
841             chunk.header()->chunk_id.load());
842   EXPECT_FALSE(last_commit_.chunks_to_patch()[0].has_more_patches());
843   EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(1));
844 
845   // Verify that the second chunk was committed.
846   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
847   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
848   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 1u);
849   EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
850 
851   // The second chunk should be in a complete state and should not need
852   // patching, as the patches to it should have been applied in the producer.
853   ASSERT_EQ(abi->GetChunkState(0u, 1u), SharedMemoryABI::kChunkComplete);
854   auto chunk2 = abi->TryAcquireChunkForReading(0u, 1u);
855   ASSERT_TRUE(chunk2.is_valid());
856   EXPECT_EQ(chunk2.header()->packets.load().count, 2);
857   EXPECT_TRUE(chunk2.header()->packets.load().flags &
858               SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
859   EXPECT_FALSE(chunk2.header()->packets.load().flags &
860                SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
861 }
862 
TEST_P(TraceWriterImplTest,FragmentingPacketWithoutEnablingProducerPatching)863 TEST_P(TraceWriterImplTest, FragmentingPacketWithoutEnablingProducerPatching) {
864   // We will simulate a batching cycle by first setting the batching period to
865   // a very large value and will force flush to simulate a flush happening
866   // when we believe it should - in this case when a patch is encountered.
867   //
868   // Note: direct producer-side patching should be disabled by default.
869   arbiter_->SetBatchCommitsDuration(UINT32_MAX);
870 
871   const BufferID kBufId = 42;
872   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
873 
874   // Write a packet that's guaranteed to span more than a single chunk.
875   auto packet = writer->NewTracePacket();
876   size_t chunk_size = page_size() / 4;
877   std::string large_string(chunk_size, 'x');
878   packet->set_for_testing()->set_str(large_string);
879 
880   // Starting a new packet should cause the first chunk and its patches to be
881   // committed to the service.
882   packet->Finalize();
883   auto packet2 = writer->NewTracePacket();
884   arbiter_->FlushPendingCommitDataRequests();
885 
886   // The first allocated chunk should be complete but need patching, since the
887   // packet extended past the chunk and no patches for the packet size or
888   // string field size were applied in the producer.
889   SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
890   ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
891   auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
892   ASSERT_TRUE(chunk.is_valid());
893   EXPECT_EQ(chunk.header()->packets.load().count, 1);
894   EXPECT_TRUE(chunk.header()->packets.load().flags &
895               SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
896   EXPECT_TRUE(chunk.header()->packets.load().flags &
897               SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
898 
899   // The first chunk was committed.
900   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
901   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
902   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
903   EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
904 
905   // The patches for the first chunk were committed.
906   ASSERT_THAT(last_commit_.chunks_to_patch(), SizeIs(1));
907   EXPECT_EQ(last_commit_.chunks_to_patch()[0].writer_id(), writer->writer_id());
908   EXPECT_EQ(last_commit_.chunks_to_patch()[0].target_buffer(), kBufId);
909   EXPECT_EQ(last_commit_.chunks_to_patch()[0].chunk_id(),
910             chunk.header()->chunk_id.load());
911   EXPECT_FALSE(last_commit_.chunks_to_patch()[0].has_more_patches());
912   EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(1));
913 }
914 
915 // Sets up a scenario in which the SMB is exhausted and TraceWriter fails to
916 // get a new chunk while fragmenting a packet. Verifies that data is dropped
917 // until the SMB is freed up and TraceWriter can get a new chunk.
TEST_P(TraceWriterImplTest,FragmentingPacketWhileBufferExhausted)918 TEST_P(TraceWriterImplTest, FragmentingPacketWhileBufferExhausted) {
919   const BufferID kBufId = 42;
920   std::unique_ptr<TraceWriter> writer =
921       arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
922 
923   // Write a small first packet, so that |writer| owns a chunk.
924   auto packet = writer->NewTracePacket();
925   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
926                    ->drop_packets_for_testing());
927   // 3 bytes for the first_packet_on_sequence flag.
928   EXPECT_EQ(packet->Finalize(), 3u);
929 
930   // Grab all the remaining chunks in the SMB in new writers.
931   std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 1> other_writers;
932   for (size_t i = 0; i < other_writers.size(); i++) {
933     other_writers[i] =
934         arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
935     auto other_writer_packet = other_writers[i]->NewTracePacket();
936     EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
937                      ->drop_packets_for_testing());
938   }
939 
940   // Write a packet that's guaranteed to span more than a single chunk,
941   // causing |writer| to attempt to acquire a new chunk but fail to do so.
942   auto packet2 = writer->NewTracePacket();
943   size_t chunk_size = page_size() / 4;
944   std::string large_string(chunk_size, 'x');
945   packet2->set_for_testing()->set_str(large_string);
946 
947   EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
948                   ->drop_packets_for_testing());
949 
950   // First chunk should be committed.
951   arbiter_->FlushPendingCommitDataRequests();
952   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
953   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
954   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
955   EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
956   EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
957 
958   // It should not need patching and not have the continuation flag set.
959   SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
960   ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
961   auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
962   ASSERT_TRUE(chunk.is_valid());
963   EXPECT_EQ(chunk.header()->packets.load().count, 2);
964   EXPECT_FALSE(chunk.header()->packets.load().flags &
965                SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
966   EXPECT_FALSE(chunk.header()->packets.load().flags &
967                SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
968 
969   // Writing more data while in garbage mode succeeds. This data is dropped.
970   packet2->Finalize();
971   auto packet3 = writer->NewTracePacket();
972   packet3->set_for_testing()->set_str(large_string);
973 
974   // Release the |writer|'s first chunk as free, so that it can grab it again.
975   abi->ReleaseChunkAsFree(std::move(chunk));
976 
977   // Starting a new packet should cause TraceWriter to attempt to grab a new
978   // chunk again, because we wrote enough data to wrap the garbage chunk.
979   packet3->Finalize();
980   auto packet4 = writer->NewTracePacket();
981 
982   // Grabbing the chunk should have succeeded.
983   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
984                    ->drop_packets_for_testing());
985 
986   // The first packet in the chunk should have the previous_packet_dropped
987   // flag set, so shouldn't be empty.
988   EXPECT_GT(packet4->Finalize(), 0u);
989 
990   // Flushing the writer causes the chunk to be released again.
991   writer->Flush();
992   EXPECT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
993   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
994   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
995   EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
996 
997   // Chunk should contain only |packet4| and not have any continuation flag
998   // set.
999   ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
1000   chunk = abi->TryAcquireChunkForReading(0u, 0u);
1001   ASSERT_TRUE(chunk.is_valid());
1002   ASSERT_EQ(chunk.header()->packets.load().count, 1);
1003   EXPECT_FALSE(chunk.header()->packets.load().flags &
1004                SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
1005   EXPECT_FALSE(
1006       chunk.header()->packets.load().flags &
1007       SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk);
1008   EXPECT_FALSE(chunk.header()->packets.load().flags &
1009                SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
1010 }
1011 
1012 // Verifies that a TraceWriter that is flushed before the SMB is full and then
1013 // acquires a garbage chunk later recovers and writes a
1014 // previous_packet_dropped marker into the trace.
TEST_P(TraceWriterImplTest,FlushBeforeBufferExhausted)1015 TEST_P(TraceWriterImplTest, FlushBeforeBufferExhausted) {
1016   const BufferID kBufId = 42;
1017   std::unique_ptr<TraceWriter> writer =
1018       arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1019 
1020   // Write a small first packet and flush it, so that |writer| no longer owns
1021   // any chunk.
1022   auto packet = writer->NewTracePacket();
1023   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1024                    ->drop_packets_for_testing());
1025   // 3 bytes for the first_packet_on_sequence flag.
1026   EXPECT_EQ(packet->Finalize(), 3u);
1027 
1028   // Flush the first chunk away.
1029   writer->Flush();
1030 
1031   // First chunk should be committed. Don't release it as free just yet.
1032   arbiter_->FlushPendingCommitDataRequests();
1033   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
1034   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
1035   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
1036 
1037   // Grab all the remaining chunks in the SMB in new writers.
1038   std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 1> other_writers;
1039   for (size_t i = 0; i < other_writers.size(); i++) {
1040     other_writers[i] =
1041         arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1042     auto other_writer_packet = other_writers[i]->NewTracePacket();
1043     EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1044                      ->drop_packets_for_testing());
1045   }
1046 
1047   // Write another packet, causing |writer| to acquire a garbage chunk.
1048   auto packet2 = writer->NewTracePacket();
1049   EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1050                   ->drop_packets_for_testing());
1051 
1052   // Writing more data while in garbage mode succeeds. This data is dropped.
1053   // Make sure that we fill the garbage chunk, so that |writer| tries to
1054   // re-acquire a valid chunk for the next packet.
1055   size_t chunk_size = page_size() / 4;
1056   std::string large_string(chunk_size, 'x');
1057   packet2->set_for_testing()->set_str(large_string);
1058   packet2->Finalize();
1059 
1060   // Next packet should still be in the garbage chunk.
1061   auto packet3 = writer->NewTracePacket();
1062   EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1063                   ->drop_packets_for_testing());
1064 
1065   // Release the first chunk as free, so |writer| can acquire it again.
1066   SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
1067   ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
1068   auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
1069   ASSERT_TRUE(chunk.is_valid());
1070   abi->ReleaseChunkAsFree(std::move(chunk));
1071 
1072   // Fill the garbage chunk, so that the writer attempts to grab another chunk
1073   // for |packet4|.
1074   packet3->set_for_testing()->set_str(large_string);
1075   packet3->Finalize();
1076 
1077   // Next packet should go into the reacquired chunk we just released.
1078   auto packet4 = writer->NewTracePacket();
1079   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1080                    ->drop_packets_for_testing());
1081 
1082   // The first packet in the chunk should have the previous_packet_dropped
1083   // flag set, so shouldn't be empty.
1084   EXPECT_GT(packet4->Finalize(), 0u);
1085 
1086   // Flushing the writer causes the chunk to be released again.
1087   writer->Flush();
1088   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
1089   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
1090   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
1091   EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
1092 
1093   // Chunk should contain only |packet4| and not have any continuation flag
1094   // set.
1095   ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
1096   chunk = abi->TryAcquireChunkForReading(0u, 0u);
1097   ASSERT_TRUE(chunk.is_valid());
1098   ASSERT_EQ(chunk.header()->packets.load().count, 1);
1099   ASSERT_FALSE(chunk.header()->packets.load().flags &
1100                SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
1101   ASSERT_FALSE(
1102       chunk.header()->packets.load().flags &
1103       SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk);
1104   ASSERT_FALSE(chunk.header()->packets.load().flags &
1105                SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
1106 }
1107 
1108 // Regression test that verifies that flushing a TraceWriter while a
1109 // fragmented packet still has uncommitted patches doesn't hit a DCHECK /
1110 // crash the writer thread.
TEST_P(TraceWriterImplTest,FlushAfterFragmentingPacketWhileBufferExhausted)1111 TEST_P(TraceWriterImplTest, FlushAfterFragmentingPacketWhileBufferExhausted) {
1112   const BufferID kBufId = 42;
1113   std::unique_ptr<TraceWriter> writer =
1114       arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1115 
1116   // Write a small first packet, so that |writer| owns a chunk.
1117   auto packet = writer->NewTracePacket();
1118   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1119                    ->drop_packets_for_testing());
1120   // 3 bytes for the first_packet_on_sequence flag.
1121   EXPECT_EQ(packet->Finalize(), 3u);
1122 
1123   // Grab all but one of the remaining chunks in the SMB in new writers.
1124   std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 2> other_writers;
1125   for (size_t i = 0; i < other_writers.size(); i++) {
1126     other_writers[i] =
1127         arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1128     auto other_writer_packet = other_writers[i]->NewTracePacket();
1129     EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1130                      ->drop_packets_for_testing());
1131   }
1132 
1133   // Write a packet that's guaranteed to span more than a two chunks, causing
1134   // |writer| to attempt to acquire two new chunks, but fail to acquire the
1135   // second.
1136   auto packet2 = writer->NewTracePacket();
1137   size_t chunk_size = page_size() / 4;
1138   std::string large_string(chunk_size * 2, 'x');
1139   packet2->set_for_testing()->set_str(large_string);
1140 
1141   EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1142                   ->drop_packets_for_testing());
1143 
1144   // First two chunks should be committed.
1145   arbiter_->FlushPendingCommitDataRequests();
1146   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(2));
1147 
1148   // Flushing should succeed, even though some patches are still in the
1149   // writer's patch list.
1150   packet2->Finalize();
1151   writer->Flush();
1152 }
1153 
TEST_P(TraceWriterImplTest,GarbageChunkWrap)1154 TEST_P(TraceWriterImplTest, GarbageChunkWrap) {
1155   const BufferID kBufId = 42;
1156 
1157   // Grab all chunks in the SMB in new writers.
1158   std::array<std::unique_ptr<TraceWriter>, kNumPages * 4> other_writers;
1159   for (size_t i = 0; i < other_writers.size(); i++) {
1160     other_writers[i] =
1161         arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1162     auto other_writer_packet = other_writers[i]->NewTracePacket();
1163     EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1164                      ->drop_packets_for_testing());
1165   }
1166 
1167   // `writer` will only get garbage chunks, since the SMB is exhausted.
1168   std::unique_ptr<TraceWriter> writer =
1169       arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1170 
1171   const size_t chunk_size = page_size() / 4;
1172   std::string half_chunk_string(chunk_size / 2, 'x');
1173 
1174   // Fill the first half of the garbage chunk.
1175   {
1176     auto packet = writer->NewTracePacket();
1177     EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1178                     ->drop_packets_for_testing());
1179     packet->set_for_testing()->set_str(half_chunk_string);
1180   }
1181 
1182   // Fill the second half of the garbage chunk and more. This will call
1183   // GetNewBuffer() and restart from the beginning of the garbage chunk.
1184   {
1185     auto packet = writer->NewTracePacket();
1186     packet->set_for_testing()->set_str(half_chunk_string);
1187   }
1188 
1189   // Check that TraceWriterImpl can write at the beginning of the garbage chunk
1190   // without any problems.
1191   {
1192     auto packet = writer->NewTracePacket();
1193     packet->set_for_testing()->set_str("str");
1194   }
1195 }
1196 
TEST_P(TraceWriterImplTest,AnnotatePatchWhileBufferExhausted)1197 TEST_P(TraceWriterImplTest, AnnotatePatchWhileBufferExhausted) {
1198   const BufferID kBufId = 42;
1199   std::unique_ptr<TraceWriter> writer =
1200       arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1201 
1202   // Write a small first packet, so that |writer| owns a chunk.
1203   ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
1204   sw->WriteBytes(reinterpret_cast<const uint8_t*>("X"), 1);
1205   writer->FinishTracePacket();
1206   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1207                    ->drop_packets_for_testing());
1208 
1209   // Grab all but one of the remaining chunks in the SMB in new writers.
1210   std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 2> other_writers;
1211   for (size_t i = 0; i < other_writers.size(); i++) {
1212     other_writers[i] =
1213         arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1214     auto other_writer_packet = other_writers[i]->NewTracePacket();
1215     EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1216                      ->drop_packets_for_testing());
1217   }
1218 
1219   // Write a packet that's guaranteed to span more than a two chunks, causing
1220   // |writer| to attempt to acquire two new chunks, but fail to acquire the
1221   // second.
1222   sw = writer->NewTracePacket().TakeStreamWriter();
1223   size_t chunk_size = page_size() / 4;
1224   std::string large_string(chunk_size * 2, 'x');
1225   sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
1226                  large_string.size());
1227 
1228   EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1229                   ->drop_packets_for_testing());
1230 
1231   uint8_t* patch1 =
1232       sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
1233   ASSERT_THAT(patch1, NotNull());
1234   patch1[0] = 0;
1235   patch1[1] = 0;
1236   patch1[2] = 0;
1237   patch1[3] = 0;
1238   patch1 = sw->AnnotatePatch(patch1);
1239   EXPECT_THAT(patch1, IsNull());
1240 
1241   // First two chunks should be committed.
1242   arbiter_->FlushPendingCommitDataRequests();
1243   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(2));
1244 
1245   // Flushing should succeed, even though some patches are still in the
1246   // writer's patch list.
1247   writer->FinishTracePacket();
1248   writer->Flush();
1249 }
1250 
TEST_P(TraceWriterImplTest,Flush)1251 TEST_P(TraceWriterImplTest, Flush) {
1252   MockFunction<void()> flush_cb;
1253 
1254   const BufferID kBufId = 42;
1255   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
1256   {
1257     auto packet = writer->NewTracePacket();
1258     packet->set_for_testing()->set_str("foobar");
1259   }
1260 
1261   EXPECT_CALL(flush_cb, Call).Times(0);
1262   ASSERT_FALSE(last_commit_callback_);
1263   writer->Flush(flush_cb.AsStdFunction());
1264   ASSERT_TRUE(last_commit_callback_);
1265   EXPECT_CALL(flush_cb, Call).Times(1);
1266   last_commit_callback_();
1267 }
1268 
TEST_P(TraceWriterImplTest,NestedMsgsPatches)1269 TEST_P(TraceWriterImplTest, NestedMsgsPatches) {
1270   const BufferID kBufId = 42;
1271   const uint32_t kNestedFieldId = 1;
1272   const uint32_t kStringFieldId = 2;
1273   const uint32_t kIntFieldId = 3;
1274   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
1275 
1276   size_t chunk_size = page_size() / 4;
1277   std::string large_string(chunk_size, 'x');
1278 
1279   auto packet = writer->NewTracePacket();
1280   auto* nested1 =
1281       packet->BeginNestedMessage<protozero::Message>(kNestedFieldId);
1282   auto* nested2 =
1283       nested1->BeginNestedMessage<protozero::Message>(kNestedFieldId);
1284   auto* nested3 =
1285       nested2->BeginNestedMessage<protozero::Message>(kNestedFieldId);
1286   uint8_t* const old_nested_1_size_field = nested1->size_field();
1287   uint8_t* const old_nested_2_size_field = nested2->size_field();
1288   uint8_t* const old_nested_3_size_field = nested3->size_field();
1289   EXPECT_THAT(old_nested_1_size_field, NotNull());
1290   EXPECT_THAT(old_nested_2_size_field, NotNull());
1291   EXPECT_THAT(old_nested_3_size_field, NotNull());
1292 
1293   // Append a small field, which will fit in the current chunk.
1294   nested3->AppendVarInt<uint64_t>(kIntFieldId, 1);
1295 
1296   // The `size_field`s still point to the same old location, inside the chunk.
1297   EXPECT_EQ(nested1->size_field(), old_nested_1_size_field);
1298   EXPECT_EQ(nested2->size_field(), old_nested_2_size_field);
1299   EXPECT_EQ(nested3->size_field(), old_nested_3_size_field);
1300 
1301   // Append a large string, which will not fit in the current chunk.
1302   nested3->AppendString(kStringFieldId, large_string);
1303 
1304   // The `size_field`s will now point to different locations (patches).
1305   EXPECT_THAT(nested1->size_field(),
1306               AllOf(Ne(old_nested_1_size_field), NotNull()));
1307   EXPECT_THAT(nested2->size_field(),
1308               AllOf(Ne(old_nested_2_size_field), NotNull()));
1309   EXPECT_THAT(nested3->size_field(),
1310               AllOf(Ne(old_nested_3_size_field), NotNull()));
1311 
1312   packet->Finalize();
1313   writer->Flush();
1314 
1315   arbiter_->FlushPendingCommitDataRequests();
1316 
1317   ASSERT_THAT(last_commit_.chunks_to_patch(), SizeIs(1));
1318   EXPECT_EQ(last_commit_.chunks_to_patch()[0].writer_id(), writer->writer_id());
1319   EXPECT_EQ(last_commit_.chunks_to_patch()[0].target_buffer(), kBufId);
1320   EXPECT_FALSE(last_commit_.chunks_to_patch()[0].has_more_patches());
1321   EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(3));
1322 }
1323 
1324 // TODO(primiano): add multi-writer test.
1325 
1326 }  // namespace
1327 }  // namespace perfetto
1328