xref: /aosp_15_r20/external/perfetto/src/traced/probes/statsd_client/statsd_binder_data_source.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "statsd_binder_data_source.h"
18 
19 #include <unistd.h>
20 
21 #include <map>
22 #include <mutex>
23 #include <optional>
24 
25 #include "perfetto/base/time.h"
26 #include "perfetto/ext/base/no_destructor.h"
27 #include "perfetto/ext/base/string_utils.h"
28 #include "perfetto/protozero/scattered_heap_buffer.h"
29 #include "perfetto/tracing/core/data_source_config.h"
30 #include "src/android_internal/lazy_library_loader.h"
31 #include "src/android_internal/statsd.h"
32 #include "src/traced/probes/statsd_client/common.h"
33 
34 #include "protos/perfetto/config/statsd/statsd_tracing_config.pbzero.h"
35 #include "protos/perfetto/trace/statsd/statsd_atom.pbzero.h"
36 #include "protos/perfetto/trace/trace_packet.pbzero.h"
37 #include "protos/third_party/statsd/shell_config.pbzero.h"
38 #include "protos/third_party/statsd/shell_data.pbzero.h"
39 
40 using ::perfetto::protos::pbzero::StatsdPullAtomConfig;
41 using ::perfetto::protos::pbzero::StatsdShellSubscription;
42 using ::perfetto::protos::pbzero::StatsdTracingConfig;
43 
44 using ShellDataDecoder = ::perfetto::proto::pbzero::ShellData_Decoder;
45 
46 namespace perfetto {
47 namespace {
48 
AddAtomSubscription(const uint8_t * subscription_config,size_t num_bytes,android_internal::AtomCallback callback,void * cookie)49 int32_t AddAtomSubscription(const uint8_t* subscription_config,
50                             size_t num_bytes,
51                             android_internal::AtomCallback callback,
52                             void* cookie) {
53   PERFETTO_LAZY_LOAD(android_internal::AddAtomSubscription, fn);
54   if (fn) {
55     return fn(subscription_config, num_bytes, callback, cookie);
56   }
57   return -1;
58 }
59 
RemoveAtomSubscription(int32_t subscription_id)60 bool RemoveAtomSubscription(int32_t subscription_id) {
61   PERFETTO_LAZY_LOAD(android_internal::RemoveAtomSubscription, fn);
62   if (fn) {
63     fn(subscription_id);
64     return true;
65   }
66   return false;
67 }
68 
FlushAtomSubscription(int32_t subscription_id)69 bool FlushAtomSubscription(int32_t subscription_id) {
70   PERFETTO_LAZY_LOAD(android_internal::FlushAtomSubscription, fn);
71   if (fn) {
72     fn(subscription_id);
73     return true;
74   }
75   return false;
76 }
77 
78 // This is a singleton for mapping Statsd subscriptions to their data source.
79 // It is needed to deal with all the threading weirdness binder introduces. The
80 // AtomCallback from AddAtomSubscription can happen on any of a pool of binder
81 // threads while StatsdBinderDatasource runs on the single main thread.
82 // This means that StatsdBinderDatasource could be destroyed while a
83 // AtomCallback is in progress. To guard against this all the mapping
84 // to/from subscription_id/StatsdBinderDatasource happens under the lock
85 // of SubscriptionTracker.
86 class SubscriptionTracker {
87  public:
88   struct Entry {
89     base::TaskRunner* task_runner;
90     base::WeakPtr<StatsdBinderDataSource> data_source;
91   };
92 
93   static SubscriptionTracker* Get();
94   void OnData(int32_t subscription_id,
95               uint32_t reason,
96               uint8_t* data,
97               size_t sz);
98   int32_t Register(base::TaskRunner* task_runner,
99                    base::WeakPtr<StatsdBinderDataSource> data_source,
100                    const std::string& config);
101   void Unregister(int32_t subscription_id);
102 
103  private:
104   friend base::NoDestructor<SubscriptionTracker>;
105 
106   SubscriptionTracker();
107   virtual ~SubscriptionTracker();
108   SubscriptionTracker(const SubscriptionTracker&) = delete;
109   SubscriptionTracker& operator=(const SubscriptionTracker&) = delete;
110 
111   // lock_ guards access to subscriptions_
112   std::mutex lock_;
113   std::map<int32_t, Entry> subscriptions_;
114 };
115 
116 // static
Get()117 SubscriptionTracker* SubscriptionTracker::Get() {
118   static base::NoDestructor<SubscriptionTracker> instance;
119   return &(instance.ref());
120 }
121 
SubscriptionTracker()122 SubscriptionTracker::SubscriptionTracker() {}
123 SubscriptionTracker::~SubscriptionTracker() = default;
124 
OnData(int32_t subscription_id,uint32_t reason,uint8_t * data,size_t sz)125 void SubscriptionTracker::OnData(int32_t subscription_id,
126                                  uint32_t reason,
127                                  uint8_t* data,
128                                  size_t sz) {
129   // Allocate and copy before we take the lock:
130   std::shared_ptr<uint8_t> copy(new uint8_t[sz],
131                                 std::default_delete<uint8_t[]>());
132   memcpy(copy.get(), data, sz);
133 
134   std::lock_guard<std::mutex> scoped_lock(lock_);
135 
136   auto it = subscriptions_.find(subscription_id);
137   if (it == subscriptions_.end()) {
138     // This is very paranoid and should not be required (since
139     // ~StatsdBinderDataSource will call this) however it would be awful to get
140     // stuck in a situation where statsd is sending us data forever and we're
141     // immediately dropping it on the floor - so if nothing wants the data we
142     // end the subscription. In the case the subscription is already gone this
143     // is a noop in libstatspull.
144     RemoveAtomSubscription(subscription_id);
145     return;
146   }
147 
148   base::TaskRunner* task_runner = it->second.task_runner;
149   base::WeakPtr<StatsdBinderDataSource> data_source = it->second.data_source;
150 
151   task_runner->PostTask([data_source, reason, copy = std::move(copy), sz]() {
152     if (data_source) {
153       data_source->OnData(reason, copy.get(), sz);
154     }
155   });
156 }
157 
Register(base::TaskRunner * task_runner,base::WeakPtr<StatsdBinderDataSource> data_source,const std::string & config)158 int32_t SubscriptionTracker::Register(
159     base::TaskRunner* task_runner,
160     base::WeakPtr<StatsdBinderDataSource> data_source,
161     const std::string& config) {
162   std::lock_guard<std::mutex> scoped_lock(lock_);
163 
164   // We do this here (as opposed to in StatsdBinderDataSource) so that
165   // we can hold the lock while we do and avoid the tiny race window between
166   // getting the subscription id and putting that id in the subscriptions_ map
167   auto* begin = reinterpret_cast<const uint8_t*>(config.data());
168   size_t size = config.size();
169   int32_t id = AddAtomSubscription(
170       begin, size,
171       [](int32_t subscription_id, uint32_t reason, uint8_t* payload,
172          size_t num_bytes, void*) {
173         SubscriptionTracker::Get()->OnData(subscription_id, reason, payload,
174                                            num_bytes);
175       },
176       nullptr);
177 
178   if (id >= 0) {
179     subscriptions_[id] = Entry{task_runner, data_source};
180   }
181 
182   return id;
183 }
184 
Unregister(int32_t subscription_id)185 void SubscriptionTracker::Unregister(int32_t subscription_id) {
186   std::lock_guard<std::mutex> scoped_lock(lock_);
187 
188   auto it = subscriptions_.find(subscription_id);
189   if (it != subscriptions_.end()) {
190     subscriptions_.erase(it);
191   }
192 
193   // Unregister is called both when the data source is finishing
194   // (~StatsdBinderDataSource) but also when we observe a
195   // kAtomCallbackReasonSubscriptionEnded message. In the latter
196   // case this call is unnecessary (the statsd subscription is already
197   // gone) but it doesn't hurt.
198   RemoveAtomSubscription(subscription_id);
199 }
200 
201 }  // namespace
202 
203 // static
204 const ProbesDataSource::Descriptor StatsdBinderDataSource::descriptor = {
205     /*name*/ "android.statsd",
206     /*flags*/ Descriptor::kFlagsNone,
207     /*fill_descriptor_func*/ nullptr,
208 };
209 
StatsdBinderDataSource(base::TaskRunner * task_runner,TracingSessionID session_id,std::unique_ptr<TraceWriter> writer,const DataSourceConfig & ds_config)210 StatsdBinderDataSource::StatsdBinderDataSource(
211     base::TaskRunner* task_runner,
212     TracingSessionID session_id,
213     std::unique_ptr<TraceWriter> writer,
214     const DataSourceConfig& ds_config)
215     : ProbesDataSource(session_id, &descriptor),
216       task_runner_(task_runner),
217       writer_(std::move(writer)),
218       shell_subscription_(CreateStatsdShellConfig(ds_config)),
219       weak_factory_(this) {}
220 
~StatsdBinderDataSource()221 StatsdBinderDataSource::~StatsdBinderDataSource() {
222   if (subscription_id_ >= 0) {
223     SubscriptionTracker::Get()->Unregister(subscription_id_);
224     subscription_id_ = -1;
225   }
226 }
227 
Start()228 void StatsdBinderDataSource::Start() {
229   // Don't bother actually connecting to statsd if no pull/push atoms
230   // were configured:
231   if (shell_subscription_.empty()) {
232     PERFETTO_LOG("Empty statsd config. Not connecting to statsd.");
233     return;
234   }
235 
236   auto weak_this = weak_factory_.GetWeakPtr();
237   subscription_id_ = SubscriptionTracker::Get()->Register(
238       task_runner_, weak_this, shell_subscription_);
239 }
240 
OnData(uint32_t reason,const uint8_t * data,size_t sz)241 void StatsdBinderDataSource::OnData(uint32_t reason,
242                                     const uint8_t* data,
243                                     size_t sz) {
244   ShellDataDecoder message(data, sz);
245   if (message.has_atom()) {
246     TraceWriter::TracePacketHandle packet = writer_->NewTracePacket();
247 
248     // The root packet gets the timestamp of *now* to aid in
249     // a) Packet sorting in trace_processor
250     // b) So we have some useful record of timestamp in case the statsd
251     //    one gets broken in some exciting way.
252     packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
253 
254     // Now put all the data. We rely on ShellData and StatsdAtom
255     // matching format exactly.
256     packet->AppendBytes(protos::pbzero::TracePacket::kStatsdAtomFieldNumber,
257                         message.begin(),
258                         static_cast<size_t>(message.end() - message.begin()));
259   }
260 
261   // If we have the pending flush in progress resolve that:
262   if (reason == android_internal::kAtomCallbackReasonFlushRequested &&
263       pending_flush_callback_) {
264     writer_->Flush(pending_flush_callback_);
265     pending_flush_callback_ = nullptr;
266   }
267 
268   if (reason == android_internal::kAtomCallbackReasonSubscriptionEnded) {
269     // This is the last packet so unregister self. It's not required to do this
270     // since we clean up in the destructor but it doesn't hurt.
271     SubscriptionTracker::Get()->Unregister(subscription_id_);
272     subscription_id_ = -1;
273   }
274 }
275 
Flush(FlushRequestID,std::function<void ()> callback)276 void StatsdBinderDataSource::Flush(FlushRequestID,
277                                    std::function<void()> callback) {
278   if (subscription_id_ < 0) {
279     writer_->Flush(callback);
280   } else {
281     // We don't want to queue up pending flushes to avoid a situation where
282     // we end up will giant queue of unresolved flushes if statsd never replies.
283     // To avoid this if there is already a flush in flight finish that one now:
284     if (pending_flush_callback_) {
285       writer_->Flush(pending_flush_callback_);
286     }
287 
288     // Remember the callback for later.
289     pending_flush_callback_ = callback;
290 
291     // Start the flush
292     if (!FlushAtomSubscription(subscription_id_)) {
293       // If it fails immediately we're done:
294       writer_->Flush(pending_flush_callback_);
295       pending_flush_callback_ = nullptr;
296     }
297   }
298 }
299 
ClearIncrementalState()300 void StatsdBinderDataSource::ClearIncrementalState() {}
301 
302 }  // namespace perfetto
303