1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/trace_writer_impl.h"
18
19 #include <vector>
20
21 #include "perfetto/ext/base/utils.h"
22 #include "perfetto/ext/tracing/core/commit_data_request.h"
23 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
24 #include "perfetto/ext/tracing/core/trace_writer.h"
25 #include "perfetto/ext/tracing/core/tracing_service.h"
26 #include "perfetto/protozero/message.h"
27 #include "perfetto/protozero/proto_utils.h"
28 #include "perfetto/protozero/scattered_stream_writer.h"
29 #include "src/base/test/test_task_runner.h"
30 #include "src/tracing/core/shared_memory_arbiter_impl.h"
31 #include "src/tracing/test/aligned_buffer_test.h"
32 #include "src/tracing/test/mock_producer_endpoint.h"
33 #include "test/gtest_and_gmock.h"
34
35 #include "protos/perfetto/trace/test_event.gen.h"
36 #include "protos/perfetto/trace/test_event.pbzero.h"
37 #include "protos/perfetto/trace/trace_packet.gen.h"
38 #include "protos/perfetto/trace/trace_packet.pbzero.h"
39
40 namespace perfetto {
41 namespace {
42
43 using ChunkHeader = SharedMemoryABI::ChunkHeader;
44 using ShmemMode = SharedMemoryABI::ShmemMode;
45 using ::protozero::ScatteredStreamWriter;
46 using ::testing::AllOf;
47 using ::testing::ElementsAre;
48 using ::testing::IsEmpty;
49 using ::testing::IsNull;
50 using ::testing::MockFunction;
51 using ::testing::Ne;
52 using ::testing::NiceMock;
53 using ::testing::Not;
54 using ::testing::NotNull;
55 using ::testing::Optional;
56 using ::testing::SizeIs;
57 using ::testing::ValuesIn;
58
59 class TraceWriterImplTest : public AlignedBufferTest {
60 public:
61 struct PatchKey {
62 uint32_t writer_id;
63 uint32_t chunk_id;
operator <perfetto::__anon372643340111::TraceWriterImplTest::PatchKey64 bool operator<(const PatchKey& other) const {
65 return std::tie(writer_id, chunk_id) <
66 std::tie(other.writer_id, other.chunk_id);
67 }
68 };
SetUp()69 void SetUp() override {
70 default_layout_ =
71 SharedMemoryArbiterImpl::default_page_layout_for_testing();
72 SharedMemoryArbiterImpl::set_default_layout_for_testing(
73 SharedMemoryABI::PageLayout::kPageDiv4);
74 AlignedBufferTest::SetUp();
75 task_runner_.reset(new base::TestTaskRunner());
76 arbiter_.reset(new SharedMemoryArbiterImpl(
77 buf(), buf_size(), ShmemMode::kDefault, page_size(),
78 &mock_producer_endpoint_, task_runner_.get()));
79 ON_CALL(mock_producer_endpoint_, CommitData)
80 .WillByDefault([&](const CommitDataRequest& req,
81 MockProducerEndpoint::CommitDataCallback cb) {
82 last_commit_ = req;
83 last_commit_callback_ = cb;
84 for (const CommitDataRequest::ChunkToPatch& c :
85 req.chunks_to_patch()) {
86 patches_[PatchKey{c.writer_id(), c.chunk_id()}] = c.patches();
87 }
88 });
89 }
90
TearDown()91 void TearDown() override {
92 arbiter_.reset();
93 task_runner_.reset();
94 SharedMemoryArbiterImpl::set_default_layout_for_testing(default_layout_);
95 }
96
CopyPayloadAndApplyPatches(SharedMemoryABI::Chunk & chunk) const97 std::vector<uint8_t> CopyPayloadAndApplyPatches(
98 SharedMemoryABI::Chunk& chunk) const {
99 std::vector<uint8_t> copy(chunk.payload_begin(),
100 chunk.payload_begin() + chunk.payload_size());
101 ChunkHeader::Packets p = chunk.header()->packets.load();
102
103 auto it = patches_.find(PatchKey{chunk.header()->writer_id.load(),
104 chunk.header()->chunk_id.load()});
105 if (it == patches_.end()) {
106 EXPECT_FALSE(p.flags & ChunkHeader::kChunkNeedsPatching);
107 return copy;
108 }
109 EXPECT_TRUE(p.flags & ChunkHeader::kChunkNeedsPatching);
110
111 for (const CommitDataRequest::ChunkToPatch::Patch& patch : it->second) {
112 if (patch.offset() + patch.data().size() > copy.size()) {
113 ADD_FAILURE() << "Patch out of bounds";
114 continue;
115 }
116 for (size_t i = 0; i < patch.data().size(); i++) {
117 copy[patch.offset() + i] =
118 reinterpret_cast<const uint8_t*>(patch.data().data())[i];
119 }
120 }
121 return copy;
122 }
123
124 // Extracts trace packets from the shared memory buffer, and returns copies of
125 // them (after applying the patches received). The producer that writes to the
126 // shared memory (i.e. the trace writer) must be destroyed.
GetPacketsFromShmemAndPatches()127 std::vector<std::string> GetPacketsFromShmemAndPatches() {
128 std::vector<std::string> packets;
129 SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
130 bool was_fragmenting = false;
131 for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
132 uint32_t header_bitmap = abi->GetPageHeaderBitmap(page_idx);
133 size_t num_chunks =
134 SharedMemoryABI::GetNumChunksFromHeaderBitmap(header_bitmap);
135 for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
136 SharedMemoryABI::ChunkState chunk_state =
137 abi->GetChunkState(page_idx, chunk_idx);
138 if (chunk_state != SharedMemoryABI::kChunkFree &&
139 chunk_state != SharedMemoryABI::kChunkComplete) {
140 ADD_FAILURE() << "Page " << page_idx << " chunk " << chunk_idx
141 << " unexpected state: " << chunk_state;
142 continue;
143 }
144 SharedMemoryABI::Chunk chunk =
145 abi->TryAcquireChunkForReading(page_idx, chunk_idx);
146 if (!chunk.is_valid())
147 continue;
148 ChunkHeader::Packets p = chunk.header()->packets.load();
149
150 EXPECT_EQ(
151 was_fragmenting,
152 static_cast<bool>(p.flags &
153 ChunkHeader::kFirstPacketContinuesFromPrevChunk));
154
155 std::vector<uint8_t> payload = CopyPayloadAndApplyPatches(chunk);
156
157 const uint8_t* read_ptr = payload.data();
158 const uint8_t* const end_read_ptr = payload.data() + payload.size();
159
160 size_t num_fragments = p.count;
161 for (; num_fragments && read_ptr < end_read_ptr; num_fragments--) {
162 uint64_t len;
163 read_ptr =
164 protozero::proto_utils::ParseVarInt(read_ptr, end_read_ptr, &len);
165 if (!was_fragmenting || packets.empty()) {
166 packets.push_back(std::string());
167 }
168 was_fragmenting = false;
169 if (read_ptr + len > end_read_ptr) {
170 ADD_FAILURE() << "Page " << page_idx << " chunk " << chunk_idx
171 << " malformed chunk";
172 }
173 packets.back().append(reinterpret_cast<const char*>(read_ptr),
174 static_cast<size_t>(len));
175 read_ptr += len;
176 }
177 EXPECT_EQ(num_fragments, 0u);
178 was_fragmenting =
179 p.flags & ChunkHeader::kLastPacketContinuesOnNextChunk;
180 }
181 }
182 // Ignore empty packets (like tracing service does).
183 packets.erase(
184 std::remove_if(packets.begin(), packets.end(),
185 [](const std::string& p) { return p.empty(); }),
186 packets.end());
187 return packets;
188 }
189
190 struct ChunkInABI {
191 size_t page_idx;
192 uint32_t header_bitmap;
193 size_t chunk_idx;
194 };
GetFirstChunkBeingWritten()195 std::optional<ChunkInABI> GetFirstChunkBeingWritten() {
196 SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
197 for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
198 uint32_t header_bitmap = abi->GetPageHeaderBitmap(page_idx);
199 size_t num_chunks =
200 SharedMemoryABI::GetNumChunksFromHeaderBitmap(header_bitmap);
201 for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
202 SharedMemoryABI::ChunkState chunk_state =
203 abi->GetChunkState(page_idx, chunk_idx);
204 if (chunk_state != SharedMemoryABI::kChunkBeingWritten) {
205 continue;
206 }
207 return ChunkInABI{page_idx, header_bitmap, chunk_idx};
208 }
209 }
210 return std::nullopt;
211 }
212
GetChunkFragments(size_t packets_count,const void * chunk_payload,size_t chunk_payload_size)213 static std::optional<std::vector<std::string>> GetChunkFragments(
214 size_t packets_count,
215 const void* chunk_payload,
216 size_t chunk_payload_size) {
217 std::vector<std::string> fragments;
218 const uint8_t* read_ptr = static_cast<const uint8_t*>(chunk_payload);
219 const uint8_t* const end_read_ptr = read_ptr + chunk_payload_size;
220
221 for (size_t num_fragments = packets_count;
222 num_fragments && read_ptr < end_read_ptr; num_fragments--) {
223 uint64_t len;
224 read_ptr =
225 protozero::proto_utils::ParseVarInt(read_ptr, end_read_ptr, &len);
226 if (read_ptr + len > end_read_ptr) {
227 return std::nullopt;
228 }
229 fragments.push_back(std::string(reinterpret_cast<const char*>(read_ptr),
230 static_cast<size_t>(len)));
231 read_ptr += len;
232 }
233 return std::make_optional(std::move(fragments));
234 }
235
236 SharedMemoryABI::PageLayout default_layout_;
237 CommitDataRequest last_commit_;
238 ProducerEndpoint::CommitDataCallback last_commit_callback_;
239 std::map<PatchKey, std::vector<CommitDataRequest::ChunkToPatch::Patch>>
240 patches_;
241 NiceMock<MockProducerEndpoint> mock_producer_endpoint_;
242
243 std::unique_ptr<base::TestTaskRunner> task_runner_;
244 std::unique_ptr<SharedMemoryArbiterImpl> arbiter_;
245 };
246
247 size_t const kPageSizes[] = {4096, 65536};
248 INSTANTIATE_TEST_SUITE_P(PageSize, TraceWriterImplTest, ValuesIn(kPageSizes));
249
TEST_P(TraceWriterImplTest,NewTracePacket)250 TEST_P(TraceWriterImplTest, NewTracePacket) {
251 const BufferID kBufId = 42;
252 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
253 const size_t kNumPackets = 32;
254 for (size_t i = 0; i < kNumPackets; i++) {
255 auto packet = writer->NewTracePacket();
256 packet->set_for_testing()->set_str(
257 std::string("foobar " + std::to_string(i)));
258 }
259
260 // Destroying the TraceWriteImpl should cause the last packet to be finalized
261 // and the chunk to be put back in the kChunkComplete state.
262 writer.reset();
263
264 std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
265 ASSERT_THAT(packets, SizeIs(kNumPackets));
266 for (size_t i = 0; i < kNumPackets; i++) {
267 protos::gen::TracePacket packet;
268 EXPECT_TRUE(packet.ParseFromString(packets[i]));
269 EXPECT_EQ(packet.for_testing().str(), "foobar " + std::to_string(i));
270 if (i == 0) {
271 EXPECT_TRUE(packet.first_packet_on_sequence());
272 } else {
273 EXPECT_FALSE(packet.first_packet_on_sequence());
274 }
275 }
276 }
277
TEST_P(TraceWriterImplTest,NewTracePacketLargePackets)278 TEST_P(TraceWriterImplTest, NewTracePacketLargePackets) {
279 const BufferID kBufId = 42;
280 const size_t chunk_size = page_size() / 4;
281 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
282 {
283 auto packet = writer->NewTracePacket();
284 packet->set_for_testing()->set_str(std::string("PACKET_1") +
285 std::string(chunk_size, 'x'));
286 }
287 {
288 auto packet = writer->NewTracePacket();
289 packet->set_for_testing()->set_str(std::string("PACKET_2") +
290 std::string(chunk_size, 'x'));
291 }
292
293 // Destroying the TraceWriteImpl should cause the last packet to be finalized
294 // and the chunk to be put back in the kChunkComplete state.
295 writer.reset();
296
297 std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
298 ASSERT_THAT(packets, SizeIs(2));
299 {
300 protos::gen::TracePacket packet;
301 EXPECT_TRUE(packet.ParseFromString(packets[0]));
302 EXPECT_EQ(packet.for_testing().str(),
303 std::string("PACKET_1") + std::string(chunk_size, 'x'));
304 }
305 {
306 protos::gen::TracePacket packet;
307 EXPECT_TRUE(packet.ParseFromString(packets[1]));
308 EXPECT_EQ(packet.for_testing().str(),
309 std::string("PACKET_2") + std::string(chunk_size, 'x'));
310 }
311 }
312
313 // A prefix corresponding to first_packet_on_sequence = true in a serialized
314 // TracePacket proto.
315 constexpr char kFirstPacketOnSequenceFlagPrefix[] = {static_cast<char>(0xB8),
316 0x5, 0x1, 0x0};
317
TEST_P(TraceWriterImplTest,NewTracePacketTakeWriter)318 TEST_P(TraceWriterImplTest, NewTracePacketTakeWriter) {
319 const BufferID kBufId = 42;
320 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
321 const size_t kNumPackets = 32;
322 for (size_t i = 0; i < kNumPackets; i++) {
323 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
324 std::string raw_proto_bytes =
325 std::string("RAW_PROTO_BYTES_") + std::to_string(i);
326 sw->WriteBytes(reinterpret_cast<const uint8_t*>(raw_proto_bytes.data()),
327 raw_proto_bytes.size());
328 writer->FinishTracePacket();
329 }
330
331 // Destroying the TraceWriteImpl should cause the last packet to be finalized
332 // and the chunk to be put back in the kChunkComplete state.
333 writer.reset();
334
335 std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
336 ASSERT_THAT(packets, SizeIs(kNumPackets));
337 for (size_t i = 0; i < kNumPackets; i++) {
338 std::string expected = "RAW_PROTO_BYTES_" + std::to_string(i);
339 if (i == 0) {
340 expected = kFirstPacketOnSequenceFlagPrefix + expected;
341 }
342 EXPECT_EQ(packets[i], expected);
343 }
344 }
345
346 #if defined(GTEST_HAS_DEATH_TEST)
347 using TraceWriterImplDeathTest = TraceWriterImplTest;
348 INSTANTIATE_TEST_SUITE_P(PageSize,
349 TraceWriterImplDeathTest,
350 ValuesIn(kPageSizes));
351
TEST_P(TraceWriterImplDeathTest,NewTracePacketTakeWriterNoFinish)352 TEST_P(TraceWriterImplDeathTest, NewTracePacketTakeWriterNoFinish) {
353 const BufferID kBufId = 42;
354 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
355
356 TraceWriterImpl::TracePacketHandle handle = writer->NewTracePacket();
357
358 // Avoid a secondary DCHECK failure from ~TraceWriterImpl() =>
359 // Message::Finalize() due to the stream writer being modified behind the
360 // Message's back. This turns the Finalize() call into a no-op.
361 handle->set_size_field(nullptr);
362
363 ScatteredStreamWriter* sw = handle.TakeStreamWriter();
364 std::string raw_proto_bytes = std::string("RAW_PROTO_BYTES");
365 sw->WriteBytes(reinterpret_cast<const uint8_t*>(raw_proto_bytes.data()),
366 raw_proto_bytes.size());
367
368 EXPECT_DEATH({ writer->NewTracePacket(); }, "");
369 }
370 #endif // defined(GTEST_HAS_DEATH_TEST)
371
TEST_P(TraceWriterImplTest,AnnotatePatch)372 TEST_P(TraceWriterImplTest, AnnotatePatch) {
373 const BufferID kBufId = 42;
374 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
375 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
376 std::string raw_proto_bytes = std::string("RAW_PROTO_BYTES");
377 sw->WriteBytes(reinterpret_cast<const uint8_t*>(raw_proto_bytes.data()),
378 raw_proto_bytes.size());
379
380 uint8_t* patch1 =
381 sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
382 ASSERT_THAT(patch1, NotNull());
383 patch1[0] = 0;
384 patch1[1] = 0;
385 patch1[2] = 0;
386 patch1[3] = 0;
387 const uint8_t* old_chunk_pointer = patch1;
388 patch1 = sw->AnnotatePatch(patch1);
389 EXPECT_NE(patch1, old_chunk_pointer);
390 ASSERT_THAT(patch1, NotNull());
391
392 sw->WriteByte('X');
393
394 uint8_t* patch2 =
395 sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
396 ASSERT_THAT(patch2, NotNull());
397 patch2[0] = 0;
398 patch2[1] = 0;
399 patch2[2] = 0;
400 patch2[3] = 0;
401 old_chunk_pointer = patch2;
402 patch2 = sw->AnnotatePatch(patch2);
403 EXPECT_NE(patch2, old_chunk_pointer);
404 ASSERT_THAT(patch2, NotNull());
405
406 const size_t chunk_size = page_size() / 4;
407 std::string large_string(chunk_size, 'x');
408
409 sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
410 large_string.size());
411
412 uint8_t* patch3 =
413 sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
414 ASSERT_THAT(patch3, NotNull());
415 patch3[0] = 0;
416 patch3[1] = 0;
417 patch3[2] = 0;
418 patch3[3] = 0;
419 old_chunk_pointer = patch3;
420 patch3 = sw->AnnotatePatch(patch3);
421 EXPECT_NE(patch3, old_chunk_pointer);
422 ASSERT_THAT(patch3, NotNull());
423
424 sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
425 large_string.size());
426
427 patch1[0] = 0x11;
428 patch1[1] = 0x11;
429 patch1[2] = 0x11;
430 patch1[3] = 0x11;
431
432 patch2[0] = 0x22;
433 patch2[1] = 0x22;
434 patch2[2] = 0x22;
435 patch2[3] = 0x22;
436
437 patch3[0] = 0x33;
438 patch3[1] = 0x33;
439 patch3[2] = 0x33;
440 patch3[3] = 0x33;
441
442 writer->FinishTracePacket();
443
444 // Destroying the TraceWriteImpl should cause the last packet to be finalized
445 // and the chunk to be put back in the kChunkComplete state.
446 writer.reset();
447
448 std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
449 EXPECT_THAT(
450 packets,
451 ElementsAre(
452 kFirstPacketOnSequenceFlagPrefix + std::string("RAW_PROTO_BYTES") +
453 std::string("\x11\x11\x11\x11") + std::string("X") +
454 std::string("\x22\x22\x22\x22") + std::string(chunk_size, 'x') +
455 std::string("\x33\x33\x33\x33") + std::string(chunk_size, 'x')));
456 }
457
TEST_P(TraceWriterImplTest,MixManualTakeAndMessage)458 TEST_P(TraceWriterImplTest, MixManualTakeAndMessage) {
459 const BufferID kBufId = 42;
460 const size_t chunk_size = page_size() / 4;
461 const std::string large_string(chunk_size, 'x');
462
463 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
464
465 {
466 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
467 std::string packet1 = std::string("PACKET_1_");
468 sw->WriteBytes(reinterpret_cast<const uint8_t*>(packet1.data()),
469 packet1.size());
470 uint8_t* patch =
471 sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
472 ASSERT_THAT(patch, NotNull());
473 patch[0] = 0;
474 patch[1] = 0;
475 patch[2] = 0;
476 patch[3] = 0;
477 const uint8_t* old_chunk_pointer = patch;
478 patch = sw->AnnotatePatch(patch);
479 EXPECT_NE(patch, old_chunk_pointer);
480 ASSERT_THAT(patch, NotNull());
481 sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
482 large_string.size());
483 patch[0] = 0xFF;
484 patch[1] = 0xFF;
485 patch[2] = 0xFF;
486 patch[3] = 0xFF;
487 writer->FinishTracePacket();
488 }
489
490 {
491 auto msg = writer->NewTracePacket();
492 std::string packet2 = std::string("PACKET_2_");
493 msg->AppendRawProtoBytes(packet2.data(), packet2.size());
494 auto* nested = msg->BeginNestedMessage<protozero::Message>(1);
495 nested->AppendRawProtoBytes(large_string.data(), large_string.size());
496 }
497
498 {
499 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
500 std::string packet3 = std::string("PACKET_3_");
501 sw->WriteBytes(reinterpret_cast<const uint8_t*>(packet3.data()),
502 packet3.size());
503 uint8_t* patch =
504 sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
505 ASSERT_THAT(patch, NotNull());
506 patch[0] = 0;
507 patch[1] = 0;
508 patch[2] = 0;
509 patch[3] = 0;
510 const uint8_t* old_chunk_pointer = patch;
511 patch = sw->AnnotatePatch(patch);
512 EXPECT_NE(patch, old_chunk_pointer);
513 ASSERT_THAT(patch, NotNull());
514 sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
515 large_string.size());
516 patch[0] = 0xFF;
517 patch[1] = 0xFF;
518 patch[2] = 0xFF;
519 patch[3] = 0xFF;
520 writer->FinishTracePacket();
521 }
522
523 // Destroying the TraceWriteImpl should cause the last packet to be finalized
524 // and the chunk to be put back in the kChunkComplete state.
525 writer.reset();
526
527 uint8_t buf[protozero::proto_utils::kMessageLengthFieldSize];
528 protozero::proto_utils::WriteRedundantVarInt(
529 static_cast<uint32_t>(large_string.size()), buf,
530 protozero::proto_utils::kMessageLengthFieldSize);
531 std::string encoded_size(reinterpret_cast<char*>(buf), sizeof(buf));
532
533 std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
534 EXPECT_THAT(
535 packets,
536 ElementsAre(kFirstPacketOnSequenceFlagPrefix + std::string("PACKET_1_") +
537 std::string("\xFF\xFF\xFF\xFF") +
538 std::string(chunk_size, 'x'),
539 std::string("PACKET_2_") + std::string("\x0A") +
540 encoded_size + std::string(chunk_size, 'x'),
541 std::string("PACKET_3_") + std::string("\xFF\xFF\xFF\xFF") +
542 std::string(chunk_size, 'x')));
543 }
544
TEST_P(TraceWriterImplTest,MessageHandleDestroyedPacketScrapable)545 TEST_P(TraceWriterImplTest, MessageHandleDestroyedPacketScrapable) {
546 const BufferID kBufId = 42;
547
548 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
549
550 auto packet = writer->NewTracePacket();
551 packet->set_for_testing()->set_str("packet1");
552
553 std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
554 ASSERT_TRUE(chunk_in_abi.has_value());
555
556 auto* abi = arbiter_->shmem_abi_for_testing();
557 SharedMemoryABI::Chunk chunk = abi->GetChunkUnchecked(
558 chunk_in_abi->page_idx, chunk_in_abi->header_bitmap,
559 chunk_in_abi->chunk_idx);
560 ASSERT_TRUE(chunk.is_valid());
561
562 EXPECT_EQ(chunk.header()->packets.load().count, 1);
563 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
564 SharedMemoryABI::ChunkState::kChunkBeingWritten);
565
566 packet = protozero::MessageHandle<protos::pbzero::TracePacket>();
567
568 // After destroying the message handle, the chunk header should have an
569 // inflated packet count.
570 EXPECT_EQ(chunk.header()->packets.load().count, 2);
571 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
572 SharedMemoryABI::ChunkState::kChunkBeingWritten);
573
574 writer.reset();
575
576 EXPECT_EQ(chunk.header()->packets.load().count, 2);
577 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
578 SharedMemoryABI::ChunkState::kChunkComplete);
579 EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
580 Optional(ElementsAre(Not(IsEmpty()))));
581 }
582
TEST_P(TraceWriterImplTest,FinishTracePacketScrapable)583 TEST_P(TraceWriterImplTest, FinishTracePacketScrapable) {
584 const BufferID kBufId = 42;
585
586 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
587
588 {
589 protos::pbzero::TestEvent test_event;
590 protozero::MessageArena arena;
591 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
592 uint8_t data[protozero::proto_utils::kMaxTagEncodedSize];
593 uint8_t* data_end = protozero::proto_utils::WriteVarInt(
594 protozero::proto_utils::MakeTagLengthDelimited(
595 protos::pbzero::TracePacket::kForTestingFieldNumber),
596 data);
597 sw->WriteBytes(data, static_cast<size_t>(data_end - data));
598 test_event.Reset(sw, &arena);
599 test_event.set_size_field(
600 sw->ReserveBytes(protozero::proto_utils::kMessageLengthFieldSize));
601 test_event.set_str("payload1");
602 }
603
604 std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
605 ASSERT_TRUE(chunk_in_abi.has_value());
606
607 auto* abi = arbiter_->shmem_abi_for_testing();
608 SharedMemoryABI::Chunk chunk = abi->GetChunkUnchecked(
609 chunk_in_abi->page_idx, chunk_in_abi->header_bitmap,
610 chunk_in_abi->chunk_idx);
611 ASSERT_TRUE(chunk.is_valid());
612
613 EXPECT_EQ(chunk.header()->packets.load().count, 1);
614 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
615 SharedMemoryABI::ChunkState::kChunkBeingWritten);
616
617 writer->FinishTracePacket();
618
619 // After a call to FinishTracePacket, the chunk header should have an inflated
620 // packet count.
621 EXPECT_EQ(chunk.header()->packets.load().count, 2);
622 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
623 SharedMemoryABI::ChunkState::kChunkBeingWritten);
624 EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
625 Optional(ElementsAre(Not(IsEmpty()))));
626
627 // An extra call to FinishTracePacket should have no effect.
628 EXPECT_EQ(chunk.header()->packets.load().count, 2);
629 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
630 SharedMemoryABI::ChunkState::kChunkBeingWritten);
631 EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
632 Optional(ElementsAre(Not(IsEmpty()))));
633
634 writer.reset();
635
636 EXPECT_EQ(chunk.header()->packets.load().count, 2);
637 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
638 SharedMemoryABI::ChunkState::kChunkComplete);
639 EXPECT_THAT(GetChunkFragments(2, chunk.payload_begin(), chunk.payload_size()),
640 Optional(ElementsAre(Not(IsEmpty()), IsEmpty())));
641 }
642
TEST_P(TraceWriterImplTest,MessageHandleDestroyedAndFinishTracePacketScrapable)643 TEST_P(TraceWriterImplTest,
644 MessageHandleDestroyedAndFinishTracePacketScrapable) {
645 const BufferID kBufId = 42;
646
647 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
648
649 auto packet = writer->NewTracePacket();
650 packet->set_for_testing()->set_str("packet1");
651
652 std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
653 ASSERT_TRUE(chunk_in_abi.has_value());
654
655 auto* abi = arbiter_->shmem_abi_for_testing();
656 SharedMemoryABI::Chunk chunk = abi->GetChunkUnchecked(
657 chunk_in_abi->page_idx, chunk_in_abi->header_bitmap,
658 chunk_in_abi->chunk_idx);
659 ASSERT_TRUE(chunk.is_valid());
660
661 EXPECT_EQ(chunk.header()->packets.load().count, 1);
662 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
663 SharedMemoryABI::ChunkState::kChunkBeingWritten);
664 packet = protozero::MessageHandle<protos::pbzero::TracePacket>();
665
666 // After destroying the message handle, the chunk header should have an
667 // inflated packet count.
668 EXPECT_EQ(chunk.header()->packets.load().count, 2);
669 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
670 SharedMemoryABI::ChunkState::kChunkBeingWritten);
671
672 writer->FinishTracePacket();
673
674 // An extra call to FinishTracePacket should have no effect.
675 EXPECT_EQ(chunk.header()->packets.load().count, 2);
676 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
677 SharedMemoryABI::ChunkState::kChunkBeingWritten);
678 EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
679 Optional(ElementsAre(Not(IsEmpty()))));
680
681 writer.reset();
682
683 EXPECT_EQ(chunk.header()->packets.load().count, 2);
684 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
685 SharedMemoryABI::ChunkState::kChunkComplete);
686 EXPECT_THAT(GetChunkFragments(2, chunk.payload_begin(), chunk.payload_size()),
687 Optional(ElementsAre(Not(IsEmpty()), IsEmpty())));
688 }
689
TEST_P(TraceWriterImplTest,MessageHandleDestroyedPacketFullChunk)690 TEST_P(TraceWriterImplTest, MessageHandleDestroyedPacketFullChunk) {
691 const BufferID kBufId = 42;
692
693 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
694
695 auto packet = writer->NewTracePacket();
696 protos::pbzero::TestEvent* test_event = packet->set_for_testing();
697 std::string chunk_filler(test_event->stream_writer()->bytes_available(),
698 '\0');
699 test_event->AppendRawProtoBytes(chunk_filler.data(), chunk_filler.size());
700
701 std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
702 ASSERT_TRUE(chunk_in_abi.has_value());
703
704 auto* abi = arbiter_->shmem_abi_for_testing();
705 SharedMemoryABI::Chunk chunk = abi->GetChunkUnchecked(
706 chunk_in_abi->page_idx, chunk_in_abi->header_bitmap,
707 chunk_in_abi->chunk_idx);
708 ASSERT_TRUE(chunk.is_valid());
709
710 EXPECT_EQ(chunk.header()->packets.load().count, 1);
711 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
712 SharedMemoryABI::ChunkState::kChunkBeingWritten);
713 // Finish the TracePacket: since there's no space for an empty packet, the
714 // trace writer should immediately mark the chunk as completed.
715 packet = protozero::MessageHandle<protos::pbzero::TracePacket>();
716
717 EXPECT_EQ(chunk.header()->packets.load().count, 1);
718 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
719 SharedMemoryABI::ChunkState::kChunkComplete);
720
721 writer.reset();
722
723 EXPECT_EQ(chunk.header()->packets.load().count, 1);
724 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
725 SharedMemoryABI::ChunkState::kChunkComplete);
726 }
727
TEST_P(TraceWriterImplTest,FinishTracePacketFullChunk)728 TEST_P(TraceWriterImplTest, FinishTracePacketFullChunk) {
729 const BufferID kBufId = 42;
730
731 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
732
733 {
734 protos::pbzero::TestEvent test_event;
735 protozero::MessageArena arena;
736 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
737 uint8_t data[protozero::proto_utils::kMaxTagEncodedSize];
738 uint8_t* data_end = protozero::proto_utils::WriteVarInt(
739 protozero::proto_utils::MakeTagLengthDelimited(
740 protos::pbzero::TracePacket::kForTestingFieldNumber),
741 data);
742 sw->WriteBytes(data, static_cast<size_t>(data_end - data));
743 test_event.Reset(sw, &arena);
744 test_event.set_size_field(
745 sw->ReserveBytes(protozero::proto_utils::kMessageLengthFieldSize));
746 std::string chunk_filler(sw->bytes_available(), '\0');
747 test_event.AppendRawProtoBytes(chunk_filler.data(), chunk_filler.size());
748 }
749
750 std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
751 ASSERT_TRUE(chunk_in_abi.has_value());
752
753 auto* abi = arbiter_->shmem_abi_for_testing();
754 SharedMemoryABI::Chunk chunk = abi->GetChunkUnchecked(
755 chunk_in_abi->page_idx, chunk_in_abi->header_bitmap,
756 chunk_in_abi->chunk_idx);
757 ASSERT_TRUE(chunk.is_valid());
758
759 EXPECT_EQ(chunk.header()->packets.load().count, 1);
760 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
761 SharedMemoryABI::ChunkState::kChunkBeingWritten);
762
763 // Finish the TracePacket: since there's no space for an empty packet, the
764 // trace writer should immediately mark the chunk as completed, instead of
765 // inflating the count.
766 writer->FinishTracePacket();
767
768 EXPECT_EQ(chunk.header()->packets.load().count, 1);
769 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
770 SharedMemoryABI::ChunkState::kChunkComplete);
771
772 writer.reset();
773
774 EXPECT_EQ(chunk.header()->packets.load().count, 1);
775 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
776 SharedMemoryABI::ChunkState::kChunkComplete);
777 }
778
TEST_P(TraceWriterImplTest,FragmentingPacketWithProducerAndServicePatching)779 TEST_P(TraceWriterImplTest, FragmentingPacketWithProducerAndServicePatching) {
780 const BufferID kBufId = 42;
781 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
782
783 // Write a packet that's guaranteed to span more than a single chunk, but
784 // less than two chunks.
785 auto packet = writer->NewTracePacket();
786 size_t chunk_size = page_size() / 4;
787 std::string large_string(chunk_size, 'x');
788 packet->set_for_testing()->set_str(large_string);
789
790 // First chunk should be committed.
791 arbiter_->FlushPendingCommitDataRequests();
792 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
793 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
794 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
795 EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
796 EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
797
798 // We will simulate a batching cycle by first setting the batching period to
799 // a very large value and then force-flushing when we are done writing data.
800 arbiter_->SetDirectSMBPatchingSupportedByService();
801 ASSERT_TRUE(arbiter_->EnableDirectSMBPatching());
802 arbiter_->SetBatchCommitsDuration(UINT32_MAX);
803
804 // Write a second packet that's guaranteed to span more than a single chunk.
805 // Starting a new trace packet should cause the patches for the first packet
806 // (i.e. for the first chunk) to be queued for sending to the service. They
807 // cannot be applied locally because the first chunk was already committed.
808 packet->Finalize();
809 auto packet2 = writer->NewTracePacket();
810 packet2->set_for_testing()->set_str(large_string);
811
812 // Starting a new packet yet again should cause the patches for the second
813 // packet (i.e. for the second chunk) to be applied in the producer, because
814 // the second chunk has not been committed yet.
815 packet2->Finalize();
816 auto packet3 = writer->NewTracePacket();
817
818 // Simulate the end of the batching period, which should trigger a commit to
819 // the service.
820 arbiter_->FlushPendingCommitDataRequests();
821
822 SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
823
824 // The first allocated chunk should be complete but need patching, since the
825 // packet extended past the chunk and no patches for the packet size or
826 // string field size were applied yet.
827 ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
828 auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
829 ASSERT_TRUE(chunk.is_valid());
830 EXPECT_EQ(chunk.header()->packets.load().count, 1u);
831 EXPECT_TRUE(chunk.header()->packets.load().flags &
832 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
833 EXPECT_TRUE(chunk.header()->packets.load().flags &
834 SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
835
836 // Verify that a patch for the first chunk was sent to the service.
837 ASSERT_THAT(last_commit_.chunks_to_patch(), SizeIs(1));
838 EXPECT_EQ(last_commit_.chunks_to_patch()[0].writer_id(), writer->writer_id());
839 EXPECT_EQ(last_commit_.chunks_to_patch()[0].target_buffer(), kBufId);
840 EXPECT_EQ(last_commit_.chunks_to_patch()[0].chunk_id(),
841 chunk.header()->chunk_id.load());
842 EXPECT_FALSE(last_commit_.chunks_to_patch()[0].has_more_patches());
843 EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(1));
844
845 // Verify that the second chunk was committed.
846 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
847 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
848 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 1u);
849 EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
850
851 // The second chunk should be in a complete state and should not need
852 // patching, as the patches to it should have been applied in the producer.
853 ASSERT_EQ(abi->GetChunkState(0u, 1u), SharedMemoryABI::kChunkComplete);
854 auto chunk2 = abi->TryAcquireChunkForReading(0u, 1u);
855 ASSERT_TRUE(chunk2.is_valid());
856 EXPECT_EQ(chunk2.header()->packets.load().count, 2);
857 EXPECT_TRUE(chunk2.header()->packets.load().flags &
858 SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
859 EXPECT_FALSE(chunk2.header()->packets.load().flags &
860 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
861 }
862
TEST_P(TraceWriterImplTest,FragmentingPacketWithoutEnablingProducerPatching)863 TEST_P(TraceWriterImplTest, FragmentingPacketWithoutEnablingProducerPatching) {
864 // We will simulate a batching cycle by first setting the batching period to
865 // a very large value and will force flush to simulate a flush happening
866 // when we believe it should - in this case when a patch is encountered.
867 //
868 // Note: direct producer-side patching should be disabled by default.
869 arbiter_->SetBatchCommitsDuration(UINT32_MAX);
870
871 const BufferID kBufId = 42;
872 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
873
874 // Write a packet that's guaranteed to span more than a single chunk.
875 auto packet = writer->NewTracePacket();
876 size_t chunk_size = page_size() / 4;
877 std::string large_string(chunk_size, 'x');
878 packet->set_for_testing()->set_str(large_string);
879
880 // Starting a new packet should cause the first chunk and its patches to be
881 // committed to the service.
882 packet->Finalize();
883 auto packet2 = writer->NewTracePacket();
884 arbiter_->FlushPendingCommitDataRequests();
885
886 // The first allocated chunk should be complete but need patching, since the
887 // packet extended past the chunk and no patches for the packet size or
888 // string field size were applied in the producer.
889 SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
890 ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
891 auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
892 ASSERT_TRUE(chunk.is_valid());
893 EXPECT_EQ(chunk.header()->packets.load().count, 1);
894 EXPECT_TRUE(chunk.header()->packets.load().flags &
895 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
896 EXPECT_TRUE(chunk.header()->packets.load().flags &
897 SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
898
899 // The first chunk was committed.
900 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
901 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
902 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
903 EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
904
905 // The patches for the first chunk were committed.
906 ASSERT_THAT(last_commit_.chunks_to_patch(), SizeIs(1));
907 EXPECT_EQ(last_commit_.chunks_to_patch()[0].writer_id(), writer->writer_id());
908 EXPECT_EQ(last_commit_.chunks_to_patch()[0].target_buffer(), kBufId);
909 EXPECT_EQ(last_commit_.chunks_to_patch()[0].chunk_id(),
910 chunk.header()->chunk_id.load());
911 EXPECT_FALSE(last_commit_.chunks_to_patch()[0].has_more_patches());
912 EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(1));
913 }
914
915 // Sets up a scenario in which the SMB is exhausted and TraceWriter fails to
916 // get a new chunk while fragmenting a packet. Verifies that data is dropped
917 // until the SMB is freed up and TraceWriter can get a new chunk.
TEST_P(TraceWriterImplTest,FragmentingPacketWhileBufferExhausted)918 TEST_P(TraceWriterImplTest, FragmentingPacketWhileBufferExhausted) {
919 const BufferID kBufId = 42;
920 std::unique_ptr<TraceWriter> writer =
921 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
922
923 // Write a small first packet, so that |writer| owns a chunk.
924 auto packet = writer->NewTracePacket();
925 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
926 ->drop_packets_for_testing());
927 // 3 bytes for the first_packet_on_sequence flag.
928 EXPECT_EQ(packet->Finalize(), 3u);
929
930 // Grab all the remaining chunks in the SMB in new writers.
931 std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 1> other_writers;
932 for (size_t i = 0; i < other_writers.size(); i++) {
933 other_writers[i] =
934 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
935 auto other_writer_packet = other_writers[i]->NewTracePacket();
936 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
937 ->drop_packets_for_testing());
938 }
939
940 // Write a packet that's guaranteed to span more than a single chunk,
941 // causing |writer| to attempt to acquire a new chunk but fail to do so.
942 auto packet2 = writer->NewTracePacket();
943 size_t chunk_size = page_size() / 4;
944 std::string large_string(chunk_size, 'x');
945 packet2->set_for_testing()->set_str(large_string);
946
947 EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
948 ->drop_packets_for_testing());
949
950 // First chunk should be committed.
951 arbiter_->FlushPendingCommitDataRequests();
952 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
953 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
954 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
955 EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
956 EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
957
958 // It should not need patching and not have the continuation flag set.
959 SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
960 ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
961 auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
962 ASSERT_TRUE(chunk.is_valid());
963 EXPECT_EQ(chunk.header()->packets.load().count, 2);
964 EXPECT_FALSE(chunk.header()->packets.load().flags &
965 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
966 EXPECT_FALSE(chunk.header()->packets.load().flags &
967 SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
968
969 // Writing more data while in garbage mode succeeds. This data is dropped.
970 packet2->Finalize();
971 auto packet3 = writer->NewTracePacket();
972 packet3->set_for_testing()->set_str(large_string);
973
974 // Release the |writer|'s first chunk as free, so that it can grab it again.
975 abi->ReleaseChunkAsFree(std::move(chunk));
976
977 // Starting a new packet should cause TraceWriter to attempt to grab a new
978 // chunk again, because we wrote enough data to wrap the garbage chunk.
979 packet3->Finalize();
980 auto packet4 = writer->NewTracePacket();
981
982 // Grabbing the chunk should have succeeded.
983 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
984 ->drop_packets_for_testing());
985
986 // The first packet in the chunk should have the previous_packet_dropped
987 // flag set, so shouldn't be empty.
988 EXPECT_GT(packet4->Finalize(), 0u);
989
990 // Flushing the writer causes the chunk to be released again.
991 writer->Flush();
992 EXPECT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
993 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
994 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
995 EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
996
997 // Chunk should contain only |packet4| and not have any continuation flag
998 // set.
999 ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
1000 chunk = abi->TryAcquireChunkForReading(0u, 0u);
1001 ASSERT_TRUE(chunk.is_valid());
1002 ASSERT_EQ(chunk.header()->packets.load().count, 1);
1003 EXPECT_FALSE(chunk.header()->packets.load().flags &
1004 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
1005 EXPECT_FALSE(
1006 chunk.header()->packets.load().flags &
1007 SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk);
1008 EXPECT_FALSE(chunk.header()->packets.load().flags &
1009 SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
1010 }
1011
1012 // Verifies that a TraceWriter that is flushed before the SMB is full and then
1013 // acquires a garbage chunk later recovers and writes a
1014 // previous_packet_dropped marker into the trace.
TEST_P(TraceWriterImplTest,FlushBeforeBufferExhausted)1015 TEST_P(TraceWriterImplTest, FlushBeforeBufferExhausted) {
1016 const BufferID kBufId = 42;
1017 std::unique_ptr<TraceWriter> writer =
1018 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1019
1020 // Write a small first packet and flush it, so that |writer| no longer owns
1021 // any chunk.
1022 auto packet = writer->NewTracePacket();
1023 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1024 ->drop_packets_for_testing());
1025 // 3 bytes for the first_packet_on_sequence flag.
1026 EXPECT_EQ(packet->Finalize(), 3u);
1027
1028 // Flush the first chunk away.
1029 writer->Flush();
1030
1031 // First chunk should be committed. Don't release it as free just yet.
1032 arbiter_->FlushPendingCommitDataRequests();
1033 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
1034 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
1035 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
1036
1037 // Grab all the remaining chunks in the SMB in new writers.
1038 std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 1> other_writers;
1039 for (size_t i = 0; i < other_writers.size(); i++) {
1040 other_writers[i] =
1041 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1042 auto other_writer_packet = other_writers[i]->NewTracePacket();
1043 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1044 ->drop_packets_for_testing());
1045 }
1046
1047 // Write another packet, causing |writer| to acquire a garbage chunk.
1048 auto packet2 = writer->NewTracePacket();
1049 EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1050 ->drop_packets_for_testing());
1051
1052 // Writing more data while in garbage mode succeeds. This data is dropped.
1053 // Make sure that we fill the garbage chunk, so that |writer| tries to
1054 // re-acquire a valid chunk for the next packet.
1055 size_t chunk_size = page_size() / 4;
1056 std::string large_string(chunk_size, 'x');
1057 packet2->set_for_testing()->set_str(large_string);
1058 packet2->Finalize();
1059
1060 // Next packet should still be in the garbage chunk.
1061 auto packet3 = writer->NewTracePacket();
1062 EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1063 ->drop_packets_for_testing());
1064
1065 // Release the first chunk as free, so |writer| can acquire it again.
1066 SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
1067 ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
1068 auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
1069 ASSERT_TRUE(chunk.is_valid());
1070 abi->ReleaseChunkAsFree(std::move(chunk));
1071
1072 // Fill the garbage chunk, so that the writer attempts to grab another chunk
1073 // for |packet4|.
1074 packet3->set_for_testing()->set_str(large_string);
1075 packet3->Finalize();
1076
1077 // Next packet should go into the reacquired chunk we just released.
1078 auto packet4 = writer->NewTracePacket();
1079 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1080 ->drop_packets_for_testing());
1081
1082 // The first packet in the chunk should have the previous_packet_dropped
1083 // flag set, so shouldn't be empty.
1084 EXPECT_GT(packet4->Finalize(), 0u);
1085
1086 // Flushing the writer causes the chunk to be released again.
1087 writer->Flush();
1088 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
1089 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
1090 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
1091 EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
1092
1093 // Chunk should contain only |packet4| and not have any continuation flag
1094 // set.
1095 ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
1096 chunk = abi->TryAcquireChunkForReading(0u, 0u);
1097 ASSERT_TRUE(chunk.is_valid());
1098 ASSERT_EQ(chunk.header()->packets.load().count, 1);
1099 ASSERT_FALSE(chunk.header()->packets.load().flags &
1100 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
1101 ASSERT_FALSE(
1102 chunk.header()->packets.load().flags &
1103 SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk);
1104 ASSERT_FALSE(chunk.header()->packets.load().flags &
1105 SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
1106 }
1107
1108 // Regression test that verifies that flushing a TraceWriter while a
1109 // fragmented packet still has uncommitted patches doesn't hit a DCHECK /
1110 // crash the writer thread.
TEST_P(TraceWriterImplTest,FlushAfterFragmentingPacketWhileBufferExhausted)1111 TEST_P(TraceWriterImplTest, FlushAfterFragmentingPacketWhileBufferExhausted) {
1112 const BufferID kBufId = 42;
1113 std::unique_ptr<TraceWriter> writer =
1114 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1115
1116 // Write a small first packet, so that |writer| owns a chunk.
1117 auto packet = writer->NewTracePacket();
1118 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1119 ->drop_packets_for_testing());
1120 // 3 bytes for the first_packet_on_sequence flag.
1121 EXPECT_EQ(packet->Finalize(), 3u);
1122
1123 // Grab all but one of the remaining chunks in the SMB in new writers.
1124 std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 2> other_writers;
1125 for (size_t i = 0; i < other_writers.size(); i++) {
1126 other_writers[i] =
1127 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1128 auto other_writer_packet = other_writers[i]->NewTracePacket();
1129 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1130 ->drop_packets_for_testing());
1131 }
1132
1133 // Write a packet that's guaranteed to span more than a two chunks, causing
1134 // |writer| to attempt to acquire two new chunks, but fail to acquire the
1135 // second.
1136 auto packet2 = writer->NewTracePacket();
1137 size_t chunk_size = page_size() / 4;
1138 std::string large_string(chunk_size * 2, 'x');
1139 packet2->set_for_testing()->set_str(large_string);
1140
1141 EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1142 ->drop_packets_for_testing());
1143
1144 // First two chunks should be committed.
1145 arbiter_->FlushPendingCommitDataRequests();
1146 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(2));
1147
1148 // Flushing should succeed, even though some patches are still in the
1149 // writer's patch list.
1150 packet2->Finalize();
1151 writer->Flush();
1152 }
1153
TEST_P(TraceWriterImplTest,GarbageChunkWrap)1154 TEST_P(TraceWriterImplTest, GarbageChunkWrap) {
1155 const BufferID kBufId = 42;
1156
1157 // Grab all chunks in the SMB in new writers.
1158 std::array<std::unique_ptr<TraceWriter>, kNumPages * 4> other_writers;
1159 for (size_t i = 0; i < other_writers.size(); i++) {
1160 other_writers[i] =
1161 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1162 auto other_writer_packet = other_writers[i]->NewTracePacket();
1163 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1164 ->drop_packets_for_testing());
1165 }
1166
1167 // `writer` will only get garbage chunks, since the SMB is exhausted.
1168 std::unique_ptr<TraceWriter> writer =
1169 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1170
1171 const size_t chunk_size = page_size() / 4;
1172 std::string half_chunk_string(chunk_size / 2, 'x');
1173
1174 // Fill the first half of the garbage chunk.
1175 {
1176 auto packet = writer->NewTracePacket();
1177 EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1178 ->drop_packets_for_testing());
1179 packet->set_for_testing()->set_str(half_chunk_string);
1180 }
1181
1182 // Fill the second half of the garbage chunk and more. This will call
1183 // GetNewBuffer() and restart from the beginning of the garbage chunk.
1184 {
1185 auto packet = writer->NewTracePacket();
1186 packet->set_for_testing()->set_str(half_chunk_string);
1187 }
1188
1189 // Check that TraceWriterImpl can write at the beginning of the garbage chunk
1190 // without any problems.
1191 {
1192 auto packet = writer->NewTracePacket();
1193 packet->set_for_testing()->set_str("str");
1194 }
1195 }
1196
TEST_P(TraceWriterImplTest,AnnotatePatchWhileBufferExhausted)1197 TEST_P(TraceWriterImplTest, AnnotatePatchWhileBufferExhausted) {
1198 const BufferID kBufId = 42;
1199 std::unique_ptr<TraceWriter> writer =
1200 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1201
1202 // Write a small first packet, so that |writer| owns a chunk.
1203 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
1204 sw->WriteBytes(reinterpret_cast<const uint8_t*>("X"), 1);
1205 writer->FinishTracePacket();
1206 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1207 ->drop_packets_for_testing());
1208
1209 // Grab all but one of the remaining chunks in the SMB in new writers.
1210 std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 2> other_writers;
1211 for (size_t i = 0; i < other_writers.size(); i++) {
1212 other_writers[i] =
1213 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1214 auto other_writer_packet = other_writers[i]->NewTracePacket();
1215 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1216 ->drop_packets_for_testing());
1217 }
1218
1219 // Write a packet that's guaranteed to span more than a two chunks, causing
1220 // |writer| to attempt to acquire two new chunks, but fail to acquire the
1221 // second.
1222 sw = writer->NewTracePacket().TakeStreamWriter();
1223 size_t chunk_size = page_size() / 4;
1224 std::string large_string(chunk_size * 2, 'x');
1225 sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
1226 large_string.size());
1227
1228 EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1229 ->drop_packets_for_testing());
1230
1231 uint8_t* patch1 =
1232 sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
1233 ASSERT_THAT(patch1, NotNull());
1234 patch1[0] = 0;
1235 patch1[1] = 0;
1236 patch1[2] = 0;
1237 patch1[3] = 0;
1238 patch1 = sw->AnnotatePatch(patch1);
1239 EXPECT_THAT(patch1, IsNull());
1240
1241 // First two chunks should be committed.
1242 arbiter_->FlushPendingCommitDataRequests();
1243 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(2));
1244
1245 // Flushing should succeed, even though some patches are still in the
1246 // writer's patch list.
1247 writer->FinishTracePacket();
1248 writer->Flush();
1249 }
1250
TEST_P(TraceWriterImplTest,Flush)1251 TEST_P(TraceWriterImplTest, Flush) {
1252 MockFunction<void()> flush_cb;
1253
1254 const BufferID kBufId = 42;
1255 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
1256 {
1257 auto packet = writer->NewTracePacket();
1258 packet->set_for_testing()->set_str("foobar");
1259 }
1260
1261 EXPECT_CALL(flush_cb, Call).Times(0);
1262 ASSERT_FALSE(last_commit_callback_);
1263 writer->Flush(flush_cb.AsStdFunction());
1264 ASSERT_TRUE(last_commit_callback_);
1265 EXPECT_CALL(flush_cb, Call).Times(1);
1266 last_commit_callback_();
1267 }
1268
TEST_P(TraceWriterImplTest,NestedMsgsPatches)1269 TEST_P(TraceWriterImplTest, NestedMsgsPatches) {
1270 const BufferID kBufId = 42;
1271 const uint32_t kNestedFieldId = 1;
1272 const uint32_t kStringFieldId = 2;
1273 const uint32_t kIntFieldId = 3;
1274 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
1275
1276 size_t chunk_size = page_size() / 4;
1277 std::string large_string(chunk_size, 'x');
1278
1279 auto packet = writer->NewTracePacket();
1280 auto* nested1 =
1281 packet->BeginNestedMessage<protozero::Message>(kNestedFieldId);
1282 auto* nested2 =
1283 nested1->BeginNestedMessage<protozero::Message>(kNestedFieldId);
1284 auto* nested3 =
1285 nested2->BeginNestedMessage<protozero::Message>(kNestedFieldId);
1286 uint8_t* const old_nested_1_size_field = nested1->size_field();
1287 uint8_t* const old_nested_2_size_field = nested2->size_field();
1288 uint8_t* const old_nested_3_size_field = nested3->size_field();
1289 EXPECT_THAT(old_nested_1_size_field, NotNull());
1290 EXPECT_THAT(old_nested_2_size_field, NotNull());
1291 EXPECT_THAT(old_nested_3_size_field, NotNull());
1292
1293 // Append a small field, which will fit in the current chunk.
1294 nested3->AppendVarInt<uint64_t>(kIntFieldId, 1);
1295
1296 // The `size_field`s still point to the same old location, inside the chunk.
1297 EXPECT_EQ(nested1->size_field(), old_nested_1_size_field);
1298 EXPECT_EQ(nested2->size_field(), old_nested_2_size_field);
1299 EXPECT_EQ(nested3->size_field(), old_nested_3_size_field);
1300
1301 // Append a large string, which will not fit in the current chunk.
1302 nested3->AppendString(kStringFieldId, large_string);
1303
1304 // The `size_field`s will now point to different locations (patches).
1305 EXPECT_THAT(nested1->size_field(),
1306 AllOf(Ne(old_nested_1_size_field), NotNull()));
1307 EXPECT_THAT(nested2->size_field(),
1308 AllOf(Ne(old_nested_2_size_field), NotNull()));
1309 EXPECT_THAT(nested3->size_field(),
1310 AllOf(Ne(old_nested_3_size_field), NotNull()));
1311
1312 packet->Finalize();
1313 writer->Flush();
1314
1315 arbiter_->FlushPendingCommitDataRequests();
1316
1317 ASSERT_THAT(last_commit_.chunks_to_patch(), SizeIs(1));
1318 EXPECT_EQ(last_commit_.chunks_to_patch()[0].writer_id(), writer->writer_id());
1319 EXPECT_EQ(last_commit_.chunks_to_patch()[0].target_buffer(), kBufId);
1320 EXPECT_FALSE(last_commit_.chunks_to_patch()[0].has_more_patches());
1321 EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(3));
1322 }
1323
1324 // TODO(primiano): add multi-writer test.
1325
1326 } // namespace
1327 } // namespace perfetto
1328