1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/service/zlib_compressor.h"
18
19 #if !PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
20 #error "Zlib must be enabled to compile this file."
21 #endif
22
23 #include <zlib.h>
24
25 #include "protos/perfetto/trace/trace.pbzero.h"
26 #include "protos/perfetto/trace/trace_packet.pbzero.h"
27
28 namespace perfetto {
29
30 namespace {
31
32 struct Preamble {
33 uint32_t size;
34 std::array<uint8_t, 16> buf;
35 };
36
37 template <uint32_t id>
GetPreamble(size_t sz)38 Preamble GetPreamble(size_t sz) {
39 Preamble preamble;
40 uint8_t* ptr = preamble.buf.data();
41 constexpr uint32_t tag = protozero::proto_utils::MakeTagLengthDelimited(id);
42 ptr = protozero::proto_utils::WriteVarInt(tag, ptr);
43 ptr = protozero::proto_utils::WriteVarInt(sz, ptr);
44 preamble.size =
45 static_cast<uint32_t>(reinterpret_cast<uintptr_t>(ptr) -
46 reinterpret_cast<uintptr_t>(preamble.buf.data()));
47 PERFETTO_DCHECK(preamble.size < preamble.buf.size());
48 return preamble;
49 }
50
PreambleToSlice(const Preamble & preamble)51 Slice PreambleToSlice(const Preamble& preamble) {
52 Slice slice = Slice::Allocate(preamble.size);
53 memcpy(slice.own_data(), preamble.buf.data(), preamble.size);
54 return slice;
55 }
56
57 // A compressor for `TracePacket`s that uses zlib. The class is exposed for
58 // testing.
59 class ZlibPacketCompressor {
60 public:
61 ZlibPacketCompressor();
62 ~ZlibPacketCompressor();
63
64 // Can be called multiple times, before Finish() is called.
65 void PushPacket(const TracePacket& packet);
66
67 // Returned the compressed data. Can be called at most once. After this call,
68 // the object is unusable (PushPacket should not be called) and must be
69 // destroyed.
70 TracePacket Finish();
71
72 private:
73 void PushData(const void* data, uint32_t size);
74 void NewOutputSlice();
75 void PushCurSlice();
76
77 z_stream stream_;
78 size_t total_new_slices_size_ = 0;
79 std::vector<Slice> new_slices_;
80 std::unique_ptr<uint8_t[]> cur_slice_;
81 };
82
ZlibPacketCompressor()83 ZlibPacketCompressor::ZlibPacketCompressor() {
84 memset(&stream_, 0, sizeof(stream_));
85 int status = deflateInit(&stream_, 6);
86 PERFETTO_CHECK(status == Z_OK);
87 }
88
~ZlibPacketCompressor()89 ZlibPacketCompressor::~ZlibPacketCompressor() {
90 int status = deflateEnd(&stream_);
91 PERFETTO_CHECK(status == Z_OK);
92 }
93
PushPacket(const TracePacket & packet)94 void ZlibPacketCompressor::PushPacket(const TracePacket& packet) {
95 // We need to be able to tokenize packets in the compressed stream, so we
96 // prefix a proto preamble to each packet. The compressed stream looks like a
97 // valid Trace proto.
98 Preamble preamble =
99 GetPreamble<protos::pbzero::Trace::kPacketFieldNumber>(packet.size());
100 PushData(preamble.buf.data(), preamble.size);
101 for (const Slice& slice : packet.slices()) {
102 PushData(slice.start, static_cast<uint32_t>(slice.size));
103 }
104 }
105
PushData(const void * data,uint32_t size)106 void ZlibPacketCompressor::PushData(const void* data, uint32_t size) {
107 stream_.next_in = const_cast<Bytef*>(static_cast<const Bytef*>(data));
108 stream_.avail_in = static_cast<uInt>(size);
109 while (stream_.avail_in != 0) {
110 if (stream_.avail_out == 0) {
111 NewOutputSlice();
112 }
113 int status = deflate(&stream_, Z_NO_FLUSH);
114 PERFETTO_CHECK(status == Z_OK);
115 }
116 }
117
Finish()118 TracePacket ZlibPacketCompressor::Finish() {
119 for (;;) {
120 int status = deflate(&stream_, Z_FINISH);
121 if (status == Z_STREAM_END)
122 break;
123 PERFETTO_CHECK(status == Z_OK || status == Z_BUF_ERROR);
124 NewOutputSlice();
125 }
126
127 PushCurSlice();
128
129 TracePacket packet;
130 packet.AddSlice(PreambleToSlice(
131 GetPreamble<protos::pbzero::TracePacket::kCompressedPacketsFieldNumber>(
132 total_new_slices_size_)));
133 for (auto& slice : new_slices_) {
134 packet.AddSlice(std::move(slice));
135 }
136 return packet;
137 }
138
NewOutputSlice()139 void ZlibPacketCompressor::NewOutputSlice() {
140 PushCurSlice();
141 cur_slice_ = std::make_unique<uint8_t[]>(kZlibCompressSliceSize);
142 stream_.next_out = reinterpret_cast<Bytef*>(cur_slice_.get());
143 stream_.avail_out = kZlibCompressSliceSize;
144 }
145
PushCurSlice()146 void ZlibPacketCompressor::PushCurSlice() {
147 if (cur_slice_) {
148 total_new_slices_size_ += kZlibCompressSliceSize - stream_.avail_out;
149 new_slices_.push_back(Slice::TakeOwnership(
150 std::move(cur_slice_), kZlibCompressSliceSize - stream_.avail_out));
151 }
152 }
153
154 } // namespace
155
ZlibCompressFn(std::vector<TracePacket> * packets)156 void ZlibCompressFn(std::vector<TracePacket>* packets) {
157 if (packets->empty()) {
158 return;
159 }
160
161 ZlibPacketCompressor stream;
162
163 for (const TracePacket& packet : *packets) {
164 stream.PushPacket(packet);
165 }
166
167 TracePacket packet = stream.Finish();
168
169 packets->clear();
170 packets->push_back(std::move(packet));
171 }
172
173 } // namespace perfetto
174