1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "perfetto/ext/protozero/proto_ring_buffer.h"
18
19 #include "perfetto/base/logging.h"
20 #include "perfetto/ext/base/paged_memory.h"
21 #include "perfetto/protozero/proto_utils.h"
22
23 namespace protozero {
24
25 namespace {
26 constexpr size_t kGrowBytes = 128 * 1024;
27
FramingError()28 inline ProtoRingBuffer::Message FramingError() {
29 ProtoRingBuffer::Message msg{};
30 msg.fatal_framing_error = true;
31 return msg;
32 }
33
34 // Tries to decode a length-delimited proto field from |start|.
35 // Returns a valid boundary if the preamble is valid and the length is within
36 // |end|, or an invalid message otherwise.
TryReadProtoMessage(const uint8_t * start,const uint8_t * end)37 ProtoRingBuffer::Message TryReadProtoMessage(const uint8_t* start,
38 const uint8_t* end) {
39 namespace proto_utils = protozero::proto_utils;
40 uint64_t field_tag = 0;
41 auto* start_of_len = proto_utils::ParseVarInt(start, end, &field_tag);
42 if (start_of_len == start)
43 return ProtoRingBuffer::Message{}; // Not enough data.
44
45 const uint32_t tag = field_tag & 0x07;
46 if (tag !=
47 static_cast<uint32_t>(proto_utils::ProtoWireType::kLengthDelimited)) {
48 PERFETTO_ELOG("RPC framing error, unexpected msg tag 0x%xu", tag);
49 return FramingError();
50 }
51
52 uint64_t msg_len = 0;
53 auto* start_of_msg = proto_utils::ParseVarInt(start_of_len, end, &msg_len);
54 if (start_of_msg == start_of_len)
55 return ProtoRingBuffer::Message{}; // Not enough data.
56
57 if (msg_len > ProtoRingBuffer::kMaxMsgSize) {
58 PERFETTO_ELOG("RPC framing error, message too large (%" PRIu64 " > %zu)",
59 msg_len, ProtoRingBuffer::kMaxMsgSize);
60 return FramingError();
61 }
62
63 if (start_of_msg + msg_len > end)
64 return ProtoRingBuffer::Message{}; // Not enough data.
65
66 ProtoRingBuffer::Message msg{};
67 msg.start = start_of_msg;
68 msg.len = static_cast<uint32_t>(msg_len);
69 msg.field_id = static_cast<uint32_t>(field_tag >> 3);
70 return msg;
71 }
72
73 } // namespace
74
RingBufferMessageReader()75 RingBufferMessageReader::RingBufferMessageReader()
76 : buf_(perfetto::base::PagedMemory::Allocate(kGrowBytes)) {}
77 RingBufferMessageReader::~RingBufferMessageReader() = default;
78
Append(const void * data_void,size_t data_len)79 void RingBufferMessageReader::Append(const void* data_void, size_t data_len) {
80 if (failed_)
81 return;
82 const uint8_t* data = static_cast<const uint8_t*>(data_void);
83 PERFETTO_DCHECK(wr_ <= buf_.size());
84 PERFETTO_DCHECK(wr_ >= rd_);
85
86 // If the last call to ReadMessage() consumed all the data in the buffer and
87 // there are no incomplete messages pending, restart from the beginning rather
88 // than keep ringing. This is the most common case.
89 if (rd_ == wr_)
90 rd_ = wr_ = 0;
91
92 // The caller is expected to always issue a ReadMessage() after each Append().
93 PERFETTO_CHECK(!fastpath_.valid());
94 if (rd_ == wr_) {
95 auto msg = TryReadMessage(data, data + data_len);
96 if (msg.valid() && msg.end() == (data + data_len)) {
97 // Fastpath: in many cases, the underlying stream will effectively
98 // preserve the atomicity of messages for most small messages.
99 // In this case we can avoid the extra buf_ roundtrip and just pass a
100 // pointer to |data| + (proto preamble len).
101 // The next call to ReadMessage)= will return |fastpath_|.
102 fastpath_ = std::move(msg);
103 return;
104 }
105 }
106
107 size_t avail = buf_.size() - wr_;
108 if (data_len > avail) {
109 // This whole section should be hit extremely rarely.
110
111 // Try first just recompacting the buffer by moving everything to the left.
112 // This can happen if we received "a message and a bit" on each Append call
113 // so we ended pup in a situation like:
114 // buf_: [unused space] [msg1 incomplete]
115 // ^rd_ ^wr_
116 //
117 // After recompaction:
118 // buf_: [msg1 incomplete]
119 // ^rd_ ^wr_
120 uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
121 memmove(&buf[0], &buf[rd_], wr_ - rd_);
122 avail += rd_;
123 wr_ -= rd_;
124 rd_ = 0;
125 if (data_len > avail) {
126 // The compaction didn't free up enough space and we need to expand the
127 // ring buffer. Yes, we could have detected this earlier and split the
128 // code paths, rather than first compacting and then realizing it wasn't
129 // sufficient. However, that would make the code harder to reason about,
130 // creating code paths that are nearly never hit, hence making it more
131 // likely to accumulate bugs in future. All this is very rare.
132 size_t new_size = buf_.size();
133 while (data_len > new_size - wr_)
134 new_size += kGrowBytes;
135 if (new_size > kMaxMsgSize * 2) {
136 failed_ = true;
137 return;
138 }
139 auto new_buf = perfetto::base::PagedMemory::Allocate(new_size);
140 memcpy(new_buf.Get(), buf_.Get(), buf_.size());
141 buf_ = std::move(new_buf);
142 avail = new_size - wr_;
143 // No need to touch rd_ / wr_ cursors.
144 }
145 }
146
147 // Append the received data at the end of the ring buffer.
148 uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
149 memcpy(&buf[wr_], data, data_len);
150 wr_ += data_len;
151 }
152
ReadMessage()153 RingBufferMessageReader::Message RingBufferMessageReader::ReadMessage() {
154 if (failed_)
155 return FramingError();
156
157 if (fastpath_.valid()) {
158 // The fastpath can only be hit when the buffer is empty.
159 PERFETTO_CHECK(rd_ == wr_);
160 auto msg = std::move(fastpath_);
161 fastpath_ = Message{};
162 return msg;
163 }
164
165 uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
166
167 PERFETTO_DCHECK(rd_ <= wr_);
168 if (rd_ >= wr_)
169 return Message{}; // Completely empty.
170
171 auto msg = TryReadMessage(&buf[rd_], &buf[wr_]);
172 if (!msg.valid()) {
173 failed_ = failed_ || msg.fatal_framing_error;
174 return msg; // Return |msg| because it could be a framing error.
175 }
176
177 const uint8_t* msg_end = msg.start + msg.len;
178 PERFETTO_CHECK(msg_end > &buf[rd_] && msg_end <= &buf[wr_]);
179 auto msg_outer_len = static_cast<size_t>(msg_end - &buf[rd_]);
180 rd_ += msg_outer_len;
181 return msg;
182 }
183
184 ProtoRingBuffer::ProtoRingBuffer() = default;
185 ProtoRingBuffer::~ProtoRingBuffer() = default;
186
TryReadMessage(const uint8_t * start,const uint8_t * end)187 ProtoRingBuffer::Message ProtoRingBuffer::TryReadMessage(const uint8_t* start,
188 const uint8_t* end) {
189 return TryReadProtoMessage(start, end);
190 }
191
192 } // namespace protozero
193