xref: /aosp_15_r20/external/perfetto/src/protozero/proto_ring_buffer.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/ext/protozero/proto_ring_buffer.h"
18 
19 #include "perfetto/base/logging.h"
20 #include "perfetto/ext/base/paged_memory.h"
21 #include "perfetto/protozero/proto_utils.h"
22 
23 namespace protozero {
24 
25 namespace {
26 constexpr size_t kGrowBytes = 128 * 1024;
27 
FramingError()28 inline ProtoRingBuffer::Message FramingError() {
29   ProtoRingBuffer::Message msg{};
30   msg.fatal_framing_error = true;
31   return msg;
32 }
33 
34 // Tries to decode a length-delimited proto field from |start|.
35 // Returns a valid boundary if the preamble is valid and the length is within
36 // |end|, or an invalid message otherwise.
TryReadProtoMessage(const uint8_t * start,const uint8_t * end)37 ProtoRingBuffer::Message TryReadProtoMessage(const uint8_t* start,
38                                              const uint8_t* end) {
39   namespace proto_utils = protozero::proto_utils;
40   uint64_t field_tag = 0;
41   auto* start_of_len = proto_utils::ParseVarInt(start, end, &field_tag);
42   if (start_of_len == start)
43     return ProtoRingBuffer::Message{};  // Not enough data.
44 
45   const uint32_t tag = field_tag & 0x07;
46   if (tag !=
47       static_cast<uint32_t>(proto_utils::ProtoWireType::kLengthDelimited)) {
48     PERFETTO_ELOG("RPC framing error, unexpected msg tag 0x%xu", tag);
49     return FramingError();
50   }
51 
52   uint64_t msg_len = 0;
53   auto* start_of_msg = proto_utils::ParseVarInt(start_of_len, end, &msg_len);
54   if (start_of_msg == start_of_len)
55     return ProtoRingBuffer::Message{};  // Not enough data.
56 
57   if (msg_len > ProtoRingBuffer::kMaxMsgSize) {
58     PERFETTO_ELOG("RPC framing error, message too large (%" PRIu64 " > %zu)",
59                   msg_len, ProtoRingBuffer::kMaxMsgSize);
60     return FramingError();
61   }
62 
63   if (start_of_msg + msg_len > end)
64     return ProtoRingBuffer::Message{};  // Not enough data.
65 
66   ProtoRingBuffer::Message msg{};
67   msg.start = start_of_msg;
68   msg.len = static_cast<uint32_t>(msg_len);
69   msg.field_id = static_cast<uint32_t>(field_tag >> 3);
70   return msg;
71 }
72 
73 }  // namespace
74 
RingBufferMessageReader()75 RingBufferMessageReader::RingBufferMessageReader()
76     : buf_(perfetto::base::PagedMemory::Allocate(kGrowBytes)) {}
77 RingBufferMessageReader::~RingBufferMessageReader() = default;
78 
Append(const void * data_void,size_t data_len)79 void RingBufferMessageReader::Append(const void* data_void, size_t data_len) {
80   if (failed_)
81     return;
82   const uint8_t* data = static_cast<const uint8_t*>(data_void);
83   PERFETTO_DCHECK(wr_ <= buf_.size());
84   PERFETTO_DCHECK(wr_ >= rd_);
85 
86   // If the last call to ReadMessage() consumed all the data in the buffer and
87   // there are no incomplete messages pending, restart from the beginning rather
88   // than keep ringing. This is the most common case.
89   if (rd_ == wr_)
90     rd_ = wr_ = 0;
91 
92   // The caller is expected to always issue a ReadMessage() after each Append().
93   PERFETTO_CHECK(!fastpath_.valid());
94   if (rd_ == wr_) {
95     auto msg = TryReadMessage(data, data + data_len);
96     if (msg.valid() && msg.end() == (data + data_len)) {
97       // Fastpath: in many cases, the underlying stream will effectively
98       // preserve the atomicity of messages for most small messages.
99       // In this case we can avoid the extra buf_ roundtrip and just pass a
100       // pointer to |data| + (proto preamble len).
101       // The next call to ReadMessage)= will return |fastpath_|.
102       fastpath_ = std::move(msg);
103       return;
104     }
105   }
106 
107   size_t avail = buf_.size() - wr_;
108   if (data_len > avail) {
109     // This whole section should be hit extremely rarely.
110 
111     // Try first just recompacting the buffer by moving everything to the left.
112     // This can happen if we received "a message and a bit" on each Append call
113     // so we ended pup in a situation like:
114     // buf_: [unused space] [msg1 incomplete]
115     //                      ^rd_             ^wr_
116     //
117     // After recompaction:
118     // buf_: [msg1 incomplete]
119     //       ^rd_             ^wr_
120     uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
121     memmove(&buf[0], &buf[rd_], wr_ - rd_);
122     avail += rd_;
123     wr_ -= rd_;
124     rd_ = 0;
125     if (data_len > avail) {
126       // The compaction didn't free up enough space and we need to expand the
127       // ring buffer. Yes, we could have detected this earlier and split the
128       // code paths, rather than first compacting and then realizing it wasn't
129       // sufficient. However, that would make the code harder to reason about,
130       // creating code paths that are nearly never hit, hence making it more
131       // likely to accumulate bugs in future. All this is very rare.
132       size_t new_size = buf_.size();
133       while (data_len > new_size - wr_)
134         new_size += kGrowBytes;
135       if (new_size > kMaxMsgSize * 2) {
136         failed_ = true;
137         return;
138       }
139       auto new_buf = perfetto::base::PagedMemory::Allocate(new_size);
140       memcpy(new_buf.Get(), buf_.Get(), buf_.size());
141       buf_ = std::move(new_buf);
142       avail = new_size - wr_;
143       // No need to touch rd_ / wr_ cursors.
144     }
145   }
146 
147   // Append the received data at the end of the ring buffer.
148   uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
149   memcpy(&buf[wr_], data, data_len);
150   wr_ += data_len;
151 }
152 
ReadMessage()153 RingBufferMessageReader::Message RingBufferMessageReader::ReadMessage() {
154   if (failed_)
155     return FramingError();
156 
157   if (fastpath_.valid()) {
158     // The fastpath can only be hit when the buffer is empty.
159     PERFETTO_CHECK(rd_ == wr_);
160     auto msg = std::move(fastpath_);
161     fastpath_ = Message{};
162     return msg;
163   }
164 
165   uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
166 
167   PERFETTO_DCHECK(rd_ <= wr_);
168   if (rd_ >= wr_)
169     return Message{};  // Completely empty.
170 
171   auto msg = TryReadMessage(&buf[rd_], &buf[wr_]);
172   if (!msg.valid()) {
173     failed_ = failed_ || msg.fatal_framing_error;
174     return msg;  // Return |msg| because it could be a framing error.
175   }
176 
177   const uint8_t* msg_end = msg.start + msg.len;
178   PERFETTO_CHECK(msg_end > &buf[rd_] && msg_end <= &buf[wr_]);
179   auto msg_outer_len = static_cast<size_t>(msg_end - &buf[rd_]);
180   rd_ += msg_outer_len;
181   return msg;
182 }
183 
184 ProtoRingBuffer::ProtoRingBuffer() = default;
185 ProtoRingBuffer::~ProtoRingBuffer() = default;
186 
TryReadMessage(const uint8_t * start,const uint8_t * end)187 ProtoRingBuffer::Message ProtoRingBuffer::TryReadMessage(const uint8_t* start,
188                                                          const uint8_t* end) {
189   return TryReadProtoMessage(start, end);
190 }
191 
192 }  // namespace protozero
193