1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "statsd_binder_data_source.h"
18
19 #include <unistd.h>
20
21 #include <map>
22 #include <mutex>
23 #include <optional>
24
25 #include "perfetto/base/time.h"
26 #include "perfetto/ext/base/no_destructor.h"
27 #include "perfetto/ext/base/string_utils.h"
28 #include "perfetto/protozero/scattered_heap_buffer.h"
29 #include "perfetto/tracing/core/data_source_config.h"
30 #include "src/android_internal/lazy_library_loader.h"
31 #include "src/android_internal/statsd.h"
32 #include "src/traced/probes/statsd_client/common.h"
33
34 #include "protos/perfetto/config/statsd/statsd_tracing_config.pbzero.h"
35 #include "protos/perfetto/trace/statsd/statsd_atom.pbzero.h"
36 #include "protos/perfetto/trace/trace_packet.pbzero.h"
37 #include "protos/third_party/statsd/shell_config.pbzero.h"
38 #include "protos/third_party/statsd/shell_data.pbzero.h"
39
40 using ::perfetto::protos::pbzero::StatsdPullAtomConfig;
41 using ::perfetto::protos::pbzero::StatsdShellSubscription;
42 using ::perfetto::protos::pbzero::StatsdTracingConfig;
43
44 using ShellDataDecoder = ::perfetto::proto::pbzero::ShellData_Decoder;
45
46 namespace perfetto {
47 namespace {
48
AddAtomSubscription(const uint8_t * subscription_config,size_t num_bytes,android_internal::AtomCallback callback,void * cookie)49 int32_t AddAtomSubscription(const uint8_t* subscription_config,
50 size_t num_bytes,
51 android_internal::AtomCallback callback,
52 void* cookie) {
53 PERFETTO_LAZY_LOAD(android_internal::AddAtomSubscription, fn);
54 if (fn) {
55 return fn(subscription_config, num_bytes, callback, cookie);
56 }
57 return -1;
58 }
59
RemoveAtomSubscription(int32_t subscription_id)60 bool RemoveAtomSubscription(int32_t subscription_id) {
61 PERFETTO_LAZY_LOAD(android_internal::RemoveAtomSubscription, fn);
62 if (fn) {
63 fn(subscription_id);
64 return true;
65 }
66 return false;
67 }
68
FlushAtomSubscription(int32_t subscription_id)69 bool FlushAtomSubscription(int32_t subscription_id) {
70 PERFETTO_LAZY_LOAD(android_internal::FlushAtomSubscription, fn);
71 if (fn) {
72 fn(subscription_id);
73 return true;
74 }
75 return false;
76 }
77
78 // This is a singleton for mapping Statsd subscriptions to their data source.
79 // It is needed to deal with all the threading weirdness binder introduces. The
80 // AtomCallback from AddAtomSubscription can happen on any of a pool of binder
81 // threads while StatsdBinderDatasource runs on the single main thread.
82 // This means that StatsdBinderDatasource could be destroyed while a
83 // AtomCallback is in progress. To guard against this all the mapping
84 // to/from subscription_id/StatsdBinderDatasource happens under the lock
85 // of SubscriptionTracker.
86 class SubscriptionTracker {
87 public:
88 struct Entry {
89 base::TaskRunner* task_runner;
90 base::WeakPtr<StatsdBinderDataSource> data_source;
91 };
92
93 static SubscriptionTracker* Get();
94 void OnData(int32_t subscription_id,
95 uint32_t reason,
96 uint8_t* data,
97 size_t sz);
98 int32_t Register(base::TaskRunner* task_runner,
99 base::WeakPtr<StatsdBinderDataSource> data_source,
100 const std::string& config);
101 void Unregister(int32_t subscription_id);
102
103 private:
104 friend base::NoDestructor<SubscriptionTracker>;
105
106 SubscriptionTracker();
107 virtual ~SubscriptionTracker();
108 SubscriptionTracker(const SubscriptionTracker&) = delete;
109 SubscriptionTracker& operator=(const SubscriptionTracker&) = delete;
110
111 // lock_ guards access to subscriptions_
112 std::mutex lock_;
113 std::map<int32_t, Entry> subscriptions_;
114 };
115
116 // static
Get()117 SubscriptionTracker* SubscriptionTracker::Get() {
118 static base::NoDestructor<SubscriptionTracker> instance;
119 return &(instance.ref());
120 }
121
SubscriptionTracker()122 SubscriptionTracker::SubscriptionTracker() {}
123 SubscriptionTracker::~SubscriptionTracker() = default;
124
OnData(int32_t subscription_id,uint32_t reason,uint8_t * data,size_t sz)125 void SubscriptionTracker::OnData(int32_t subscription_id,
126 uint32_t reason,
127 uint8_t* data,
128 size_t sz) {
129 // Allocate and copy before we take the lock:
130 std::shared_ptr<uint8_t> copy(new uint8_t[sz],
131 std::default_delete<uint8_t[]>());
132 memcpy(copy.get(), data, sz);
133
134 std::lock_guard<std::mutex> scoped_lock(lock_);
135
136 auto it = subscriptions_.find(subscription_id);
137 if (it == subscriptions_.end()) {
138 // This is very paranoid and should not be required (since
139 // ~StatsdBinderDataSource will call this) however it would be awful to get
140 // stuck in a situation where statsd is sending us data forever and we're
141 // immediately dropping it on the floor - so if nothing wants the data we
142 // end the subscription. In the case the subscription is already gone this
143 // is a noop in libstatspull.
144 RemoveAtomSubscription(subscription_id);
145 return;
146 }
147
148 base::TaskRunner* task_runner = it->second.task_runner;
149 base::WeakPtr<StatsdBinderDataSource> data_source = it->second.data_source;
150
151 task_runner->PostTask([data_source, reason, copy = std::move(copy), sz]() {
152 if (data_source) {
153 data_source->OnData(reason, copy.get(), sz);
154 }
155 });
156 }
157
Register(base::TaskRunner * task_runner,base::WeakPtr<StatsdBinderDataSource> data_source,const std::string & config)158 int32_t SubscriptionTracker::Register(
159 base::TaskRunner* task_runner,
160 base::WeakPtr<StatsdBinderDataSource> data_source,
161 const std::string& config) {
162 std::lock_guard<std::mutex> scoped_lock(lock_);
163
164 // We do this here (as opposed to in StatsdBinderDataSource) so that
165 // we can hold the lock while we do and avoid the tiny race window between
166 // getting the subscription id and putting that id in the subscriptions_ map
167 auto* begin = reinterpret_cast<const uint8_t*>(config.data());
168 size_t size = config.size();
169 int32_t id = AddAtomSubscription(
170 begin, size,
171 [](int32_t subscription_id, uint32_t reason, uint8_t* payload,
172 size_t num_bytes, void*) {
173 SubscriptionTracker::Get()->OnData(subscription_id, reason, payload,
174 num_bytes);
175 },
176 nullptr);
177
178 if (id >= 0) {
179 subscriptions_[id] = Entry{task_runner, data_source};
180 }
181
182 return id;
183 }
184
Unregister(int32_t subscription_id)185 void SubscriptionTracker::Unregister(int32_t subscription_id) {
186 std::lock_guard<std::mutex> scoped_lock(lock_);
187
188 auto it = subscriptions_.find(subscription_id);
189 if (it != subscriptions_.end()) {
190 subscriptions_.erase(it);
191 }
192
193 // Unregister is called both when the data source is finishing
194 // (~StatsdBinderDataSource) but also when we observe a
195 // kAtomCallbackReasonSubscriptionEnded message. In the latter
196 // case this call is unnecessary (the statsd subscription is already
197 // gone) but it doesn't hurt.
198 RemoveAtomSubscription(subscription_id);
199 }
200
201 } // namespace
202
203 // static
204 const ProbesDataSource::Descriptor StatsdBinderDataSource::descriptor = {
205 /*name*/ "android.statsd",
206 /*flags*/ Descriptor::kFlagsNone,
207 /*fill_descriptor_func*/ nullptr,
208 };
209
StatsdBinderDataSource(base::TaskRunner * task_runner,TracingSessionID session_id,std::unique_ptr<TraceWriter> writer,const DataSourceConfig & ds_config)210 StatsdBinderDataSource::StatsdBinderDataSource(
211 base::TaskRunner* task_runner,
212 TracingSessionID session_id,
213 std::unique_ptr<TraceWriter> writer,
214 const DataSourceConfig& ds_config)
215 : ProbesDataSource(session_id, &descriptor),
216 task_runner_(task_runner),
217 writer_(std::move(writer)),
218 shell_subscription_(CreateStatsdShellConfig(ds_config)),
219 weak_factory_(this) {}
220
~StatsdBinderDataSource()221 StatsdBinderDataSource::~StatsdBinderDataSource() {
222 if (subscription_id_ >= 0) {
223 SubscriptionTracker::Get()->Unregister(subscription_id_);
224 subscription_id_ = -1;
225 }
226 }
227
Start()228 void StatsdBinderDataSource::Start() {
229 // Don't bother actually connecting to statsd if no pull/push atoms
230 // were configured:
231 if (shell_subscription_.empty()) {
232 PERFETTO_LOG("Empty statsd config. Not connecting to statsd.");
233 return;
234 }
235
236 auto weak_this = weak_factory_.GetWeakPtr();
237 subscription_id_ = SubscriptionTracker::Get()->Register(
238 task_runner_, weak_this, shell_subscription_);
239 }
240
OnData(uint32_t reason,const uint8_t * data,size_t sz)241 void StatsdBinderDataSource::OnData(uint32_t reason,
242 const uint8_t* data,
243 size_t sz) {
244 ShellDataDecoder message(data, sz);
245 if (message.has_atom()) {
246 TraceWriter::TracePacketHandle packet = writer_->NewTracePacket();
247
248 // The root packet gets the timestamp of *now* to aid in
249 // a) Packet sorting in trace_processor
250 // b) So we have some useful record of timestamp in case the statsd
251 // one gets broken in some exciting way.
252 packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
253
254 // Now put all the data. We rely on ShellData and StatsdAtom
255 // matching format exactly.
256 packet->AppendBytes(protos::pbzero::TracePacket::kStatsdAtomFieldNumber,
257 message.begin(),
258 static_cast<size_t>(message.end() - message.begin()));
259 }
260
261 // If we have the pending flush in progress resolve that:
262 if (reason == android_internal::kAtomCallbackReasonFlushRequested &&
263 pending_flush_callback_) {
264 writer_->Flush(pending_flush_callback_);
265 pending_flush_callback_ = nullptr;
266 }
267
268 if (reason == android_internal::kAtomCallbackReasonSubscriptionEnded) {
269 // This is the last packet so unregister self. It's not required to do this
270 // since we clean up in the destructor but it doesn't hurt.
271 SubscriptionTracker::Get()->Unregister(subscription_id_);
272 subscription_id_ = -1;
273 }
274 }
275
Flush(FlushRequestID,std::function<void ()> callback)276 void StatsdBinderDataSource::Flush(FlushRequestID,
277 std::function<void()> callback) {
278 if (subscription_id_ < 0) {
279 writer_->Flush(callback);
280 } else {
281 // We don't want to queue up pending flushes to avoid a situation where
282 // we end up will giant queue of unresolved flushes if statsd never replies.
283 // To avoid this if there is already a flush in flight finish that one now:
284 if (pending_flush_callback_) {
285 writer_->Flush(pending_flush_callback_);
286 }
287
288 // Remember the callback for later.
289 pending_flush_callback_ = callback;
290
291 // Start the flush
292 if (!FlushAtomSubscription(subscription_id_)) {
293 // If it fails immediately we're done:
294 writer_->Flush(pending_flush_callback_);
295 pending_flush_callback_ = nullptr;
296 }
297 }
298 }
299
ClearIncrementalState()300 void StatsdBinderDataSource::ClearIncrementalState() {}
301
302 } // namespace perfetto
303