1 /*
2 * Copyright (C) 2019, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <aidl/android/os/BnPullAtomCallback.h>
18 #include <aidl/android/os/IPullAtomResultReceiver.h>
19 #include <aidl/android/os/IStatsd.h>
20 #include <aidl/android/util/StatsEventParcel.h>
21 #include <android/binder_auto_utils.h>
22 #include <android/binder_ibinder.h>
23 #include <android/binder_manager.h>
24 #include <com_android_os_statsd_flags.h>
25 #include <stats_event.h>
26 #include <stats_pull_atom_callback.h>
27
28 #include <map>
29 #include <queue>
30 #include <thread>
31 #include <vector>
32
33 using Status = ::ndk::ScopedAStatus;
34 using aidl::android::os::BnPullAtomCallback;
35 using aidl::android::os::IPullAtomResultReceiver;
36 using aidl::android::os::IStatsd;
37 using aidl::android::util::StatsEventParcel;
38 using ::ndk::SharedRefBase;
39
40 namespace flags = com::android::os::statsd::flags;
41
42 struct AStatsEventList {
43 std::vector<AStatsEvent*> data;
44 };
45
AStatsEventList_addStatsEvent(AStatsEventList * pull_data)46 AStatsEvent* AStatsEventList_addStatsEvent(AStatsEventList* pull_data) {
47 AStatsEvent* event = AStatsEvent_obtain();
48 pull_data->data.push_back(event);
49 return event;
50 }
51
52 constexpr int64_t DEFAULT_COOL_DOWN_MILLIS = 1000LL; // 1 second.
53 constexpr int64_t DEFAULT_TIMEOUT_MILLIS = 1500LL; // 1.5 seconds.
54
55 struct AStatsManager_PullAtomMetadata {
56 int64_t cool_down_millis;
57 int64_t timeout_millis;
58 std::vector<int32_t> additive_fields;
59 };
60
AStatsManager_PullAtomMetadata_obtain()61 AStatsManager_PullAtomMetadata* AStatsManager_PullAtomMetadata_obtain() {
62 AStatsManager_PullAtomMetadata* metadata = new AStatsManager_PullAtomMetadata();
63 metadata->cool_down_millis = DEFAULT_COOL_DOWN_MILLIS;
64 metadata->timeout_millis = DEFAULT_TIMEOUT_MILLIS;
65 metadata->additive_fields = std::vector<int32_t>();
66 return metadata;
67 }
68
AStatsManager_PullAtomMetadata_release(AStatsManager_PullAtomMetadata * metadata)69 void AStatsManager_PullAtomMetadata_release(AStatsManager_PullAtomMetadata* metadata) {
70 delete metadata;
71 }
72
AStatsManager_PullAtomMetadata_setCoolDownMillis(AStatsManager_PullAtomMetadata * metadata,int64_t cool_down_millis)73 void AStatsManager_PullAtomMetadata_setCoolDownMillis(AStatsManager_PullAtomMetadata* metadata,
74 int64_t cool_down_millis) {
75 metadata->cool_down_millis = cool_down_millis;
76 }
77
AStatsManager_PullAtomMetadata_getCoolDownMillis(AStatsManager_PullAtomMetadata * metadata)78 int64_t AStatsManager_PullAtomMetadata_getCoolDownMillis(AStatsManager_PullAtomMetadata* metadata) {
79 return metadata->cool_down_millis;
80 }
81
AStatsManager_PullAtomMetadata_setTimeoutMillis(AStatsManager_PullAtomMetadata * metadata,int64_t timeout_millis)82 void AStatsManager_PullAtomMetadata_setTimeoutMillis(AStatsManager_PullAtomMetadata* metadata,
83 int64_t timeout_millis) {
84 metadata->timeout_millis = timeout_millis;
85 }
86
AStatsManager_PullAtomMetadata_getTimeoutMillis(AStatsManager_PullAtomMetadata * metadata)87 int64_t AStatsManager_PullAtomMetadata_getTimeoutMillis(AStatsManager_PullAtomMetadata* metadata) {
88 return metadata->timeout_millis;
89 }
90
AStatsManager_PullAtomMetadata_setAdditiveFields(AStatsManager_PullAtomMetadata * metadata,int32_t * additive_fields,int32_t num_fields)91 void AStatsManager_PullAtomMetadata_setAdditiveFields(AStatsManager_PullAtomMetadata* metadata,
92 int32_t* additive_fields,
93 int32_t num_fields) {
94 metadata->additive_fields.assign(additive_fields, additive_fields + num_fields);
95 }
96
AStatsManager_PullAtomMetadata_getNumAdditiveFields(AStatsManager_PullAtomMetadata * metadata)97 int32_t AStatsManager_PullAtomMetadata_getNumAdditiveFields(
98 AStatsManager_PullAtomMetadata* metadata) {
99 return metadata->additive_fields.size();
100 }
101
AStatsManager_PullAtomMetadata_getAdditiveFields(AStatsManager_PullAtomMetadata * metadata,int32_t * fields)102 void AStatsManager_PullAtomMetadata_getAdditiveFields(AStatsManager_PullAtomMetadata* metadata,
103 int32_t* fields) {
104 std::copy(metadata->additive_fields.begin(), metadata->additive_fields.end(), fields);
105 }
106
107 class StatsPullAtomCallbackInternal : public BnPullAtomCallback {
108 public:
StatsPullAtomCallbackInternal(const AStatsManager_PullAtomCallback callback,void * cookie,const int64_t coolDownMillis,const int64_t timeoutMillis,const std::vector<int32_t> additiveFields)109 StatsPullAtomCallbackInternal(const AStatsManager_PullAtomCallback callback, void* cookie,
110 const int64_t coolDownMillis, const int64_t timeoutMillis,
111 const std::vector<int32_t> additiveFields)
112 : mCallback(callback),
113 mCookie(cookie),
114 mCoolDownMillis(coolDownMillis),
115 mTimeoutMillis(timeoutMillis),
116 mAdditiveFields(additiveFields) {}
117
onPullAtom(int32_t atomTag,const std::shared_ptr<IPullAtomResultReceiver> & resultReceiver)118 Status onPullAtom(int32_t atomTag,
119 const std::shared_ptr<IPullAtomResultReceiver>& resultReceiver) override {
120 AStatsEventList statsEventList;
121 int successInt = mCallback(atomTag, &statsEventList, mCookie);
122 bool success = successInt == AStatsManager_PULL_SUCCESS;
123
124 // Convert stats_events into StatsEventParcels.
125 std::vector<StatsEventParcel> parcels;
126
127 for (int i = 0; i < statsEventList.data.size(); i++) {
128 size_t size;
129 uint8_t* buffer = AStatsEvent_getBuffer(statsEventList.data[i], &size);
130
131 StatsEventParcel p;
132 // vector.assign() creates a copy, but this is inevitable unless
133 // stats_event.h/c uses a vector as opposed to a buffer.
134 p.buffer.assign(buffer, buffer + size);
135 parcels.push_back(std::move(p));
136 }
137
138 Status status = resultReceiver->pullFinished(atomTag, success, parcels);
139 if (!status.isOk()) {
140 std::vector<StatsEventParcel> emptyParcels;
141 resultReceiver->pullFinished(atomTag, /*success=*/false, emptyParcels);
142 }
143 for (int i = 0; i < statsEventList.data.size(); i++) {
144 AStatsEvent_release(statsEventList.data[i]);
145 }
146 return Status::ok();
147 }
148
getCoolDownMillis() const149 int64_t getCoolDownMillis() const { return mCoolDownMillis; }
getTimeoutMillis() const150 int64_t getTimeoutMillis() const { return mTimeoutMillis; }
getAdditiveFields() const151 const std::vector<int32_t>& getAdditiveFields() const { return mAdditiveFields; }
152
153 private:
154 const AStatsManager_PullAtomCallback mCallback;
155 void* mCookie;
156 const int64_t mCoolDownMillis;
157 const int64_t mTimeoutMillis;
158 const std::vector<int32_t> mAdditiveFields;
159 };
160
161 /**
162 * @brief pullersMutex is used to guard simultaneous access to pullers from below threads
163 * Main thread
164 * - AStatsManager_setPullAtomCallback()
165 * - AStatsManager_clearPullAtomCallback()
166 * Binder thread:
167 * - StatsdProvider::binderDied()
168 */
169 static std::mutex pullersMutex;
170
171 static std::map<int32_t, std::shared_ptr<StatsPullAtomCallbackInternal>> pullers;
172
173 class StatsdProvider {
174 public:
StatsdProvider()175 StatsdProvider() : mDeathRecipient(AIBinder_DeathRecipient_new(binderDied)) {
176 }
177
~StatsdProvider()178 ~StatsdProvider() {
179 resetStatsService();
180 }
181
getStatsService()182 std::shared_ptr<IStatsd> getStatsService() {
183 // There are host unit tests which are using libstatspull
184 // Since we do not have statsd on host - the getStatsService() is no-op and
185 // should return nullptr
186 #ifdef __ANDROID__
187 std::lock_guard<std::mutex> lock(mStatsdMutex);
188 if (!mStatsd) {
189 // Fetch statsd
190
191 ::ndk::SpAIBinder binder;
192 // below ifs cannot be combined into single statement due to the way how
193 // macro __builtin_available is handler by compiler:
194 // - it should be used explicitly & independently to guard the corresponding API call
195 // once use_wait_for_service_api flag will be finalized, external if/else pair will be
196 // removed
197 if (flags::use_wait_for_service_api()) {
198 if (__builtin_available(android __ANDROID_API_S__, *)) {
199 binder.set(AServiceManager_waitForService("stats"));
200 } else {
201 binder.set(AServiceManager_getService("stats"));
202 }
203 } else {
204 binder.set(AServiceManager_getService("stats"));
205 }
206 mStatsd = IStatsd::fromBinder(binder);
207 if (mStatsd) {
208 AIBinder_linkToDeath(binder.get(), mDeathRecipient.get(), this);
209 }
210 }
211 #endif // __ANDROID__
212 return mStatsd;
213 }
214
resetStatsService()215 void resetStatsService() {
216 std::lock_guard<std::mutex> lock(mStatsdMutex);
217 mStatsd = nullptr;
218 }
219
binderDied(void * cookie)220 static void binderDied(void* cookie) {
221 StatsdProvider* statsProvider = static_cast<StatsdProvider*>(cookie);
222 statsProvider->resetStatsService();
223
224 std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
225 if (statsService == nullptr) {
226 return;
227 }
228
229 // Since we do not want to make an IPC with the lock held, we first create a
230 // copy of the data with the lock held before iterating through the map.
231 std::map<int32_t, std::shared_ptr<StatsPullAtomCallbackInternal>> pullersCopy;
232 {
233 std::lock_guard<std::mutex> lock(pullersMutex);
234 pullersCopy = pullers;
235 }
236 for (const auto& it : pullersCopy) {
237 statsService->registerNativePullAtomCallback(it.first, it.second->getCoolDownMillis(),
238 it.second->getTimeoutMillis(),
239 it.second->getAdditiveFields(), it.second);
240 }
241 }
242
243 private:
244 /**
245 * @brief mStatsdMutex is used to guard simultaneous access to mStatsd from below threads:
246 * Work thread
247 * - registerStatsPullAtomCallbackBlocking()
248 * - unregisterStatsPullAtomCallbackBlocking()
249 * Binder thread:
250 * - StatsdProvider::binderDied()
251 */
252 std::mutex mStatsdMutex;
253 std::shared_ptr<IStatsd> mStatsd;
254 ::ndk::ScopedAIBinder_DeathRecipient mDeathRecipient;
255 };
256
257 static std::shared_ptr<StatsdProvider> statsProvider = std::make_shared<StatsdProvider>();
258
registerStatsPullAtomCallbackBlocking(int32_t atomTag,std::shared_ptr<StatsdProvider> statsProvider,std::shared_ptr<StatsPullAtomCallbackInternal> cb)259 void registerStatsPullAtomCallbackBlocking(int32_t atomTag,
260 std::shared_ptr<StatsdProvider> statsProvider,
261 std::shared_ptr<StatsPullAtomCallbackInternal> cb) {
262 const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
263 if (statsService == nullptr) {
264 // Statsd not available
265 return;
266 }
267
268 statsService->registerNativePullAtomCallback(
269 atomTag, cb->getCoolDownMillis(), cb->getTimeoutMillis(), cb->getAdditiveFields(), cb);
270 }
271
unregisterStatsPullAtomCallbackBlocking(int32_t atomTag,std::shared_ptr<StatsdProvider> statsProvider)272 void unregisterStatsPullAtomCallbackBlocking(int32_t atomTag,
273 std::shared_ptr<StatsdProvider> statsProvider) {
274 const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
275 if (statsService == nullptr) {
276 // Statsd not available
277 return;
278 }
279
280 statsService->unregisterNativePullAtomCallback(atomTag);
281 }
282
283 class CallbackOperationsHandler {
284 struct Cmd {
285 enum Type { CMD_REGISTER, CMD_UNREGISTER };
286
287 Type type;
288 int atomTag;
289 std::shared_ptr<StatsPullAtomCallbackInternal> callback;
290 };
291
292 public:
~CallbackOperationsHandler()293 ~CallbackOperationsHandler() {
294 if (mWorkThread.joinable()) {
295 mWorkThread.join();
296 }
297 }
298
getInstance()299 static CallbackOperationsHandler& getInstance() {
300 static CallbackOperationsHandler handler;
301 return handler;
302 }
303
registerCallback(int atomTag,std::shared_ptr<StatsPullAtomCallbackInternal> callback)304 void registerCallback(int atomTag, std::shared_ptr<StatsPullAtomCallbackInternal> callback) {
305 auto registerCmd = std::make_unique<Cmd>();
306 registerCmd->type = Cmd::CMD_REGISTER;
307 registerCmd->atomTag = atomTag;
308 registerCmd->callback = std::move(callback);
309 pushToQueue(std::move(registerCmd));
310
311 startWorkerThread();
312 }
313
unregisterCallback(int atomTag)314 void unregisterCallback(int atomTag) {
315 auto unregisterCmd = std::make_unique<Cmd>();
316 unregisterCmd->type = Cmd::CMD_UNREGISTER;
317 unregisterCmd->atomTag = atomTag;
318 pushToQueue(std::move(unregisterCmd));
319
320 startWorkerThread();
321 }
322
323 private:
324 std::atomic_bool mThreadAlive = false;
325 std::thread mWorkThread;
326
327 std::mutex mMutex;
328 std::queue<std::unique_ptr<Cmd>> mCmdQueue;
329
CallbackOperationsHandler()330 CallbackOperationsHandler() {
331 }
332
pushToQueue(std::unique_ptr<Cmd> cmd)333 void pushToQueue(std::unique_ptr<Cmd> cmd) {
334 std::unique_lock<std::mutex> lock(mMutex);
335 mCmdQueue.push(std::move(cmd));
336 }
337
startWorkerThread()338 void startWorkerThread() {
339 // Only spawn one thread to manage requests
340 if (mThreadAlive) {
341 return;
342 }
343 mThreadAlive = true;
344 if (mWorkThread.joinable()) {
345 mWorkThread.join();
346 }
347 mWorkThread = std::thread(&CallbackOperationsHandler::processCommands, this, statsProvider);
348 }
349
processCommands(std::shared_ptr<StatsdProvider> statsProvider)350 void processCommands(std::shared_ptr<StatsdProvider> statsProvider) {
351 /**
352 * First trying to obtain stats service instance
353 * This is a blocking call, which waits on service readiness
354 */
355 const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
356
357 if (!statsService) {
358 // Statsd not available - dropping all submitted command requests
359 std::queue<std::unique_ptr<Cmd>> emptyQueue;
360 std::unique_lock<std::mutex> lock(mMutex);
361 mCmdQueue.swap(emptyQueue);
362 mThreadAlive = false;
363 return;
364 }
365
366 while (true) {
367 std::unique_ptr<Cmd> cmd = nullptr;
368 {
369 /**
370 * To guarantee sequential commands processing we need to lock mutex queue
371 */
372 std::unique_lock<std::mutex> lock(mMutex);
373 if (mCmdQueue.empty()) {
374 mThreadAlive = false;
375 return;
376 }
377
378 cmd = std::move(mCmdQueue.front());
379 mCmdQueue.pop();
380 }
381
382 switch (cmd->type) {
383 case Cmd::CMD_REGISTER: {
384 registerStatsPullAtomCallbackBlocking(cmd->atomTag, statsProvider,
385 cmd->callback);
386 break;
387 }
388 case Cmd::CMD_UNREGISTER: {
389 unregisterStatsPullAtomCallbackBlocking(cmd->atomTag, statsProvider);
390 break;
391 }
392 }
393 }
394 }
395 };
396
AStatsManager_setPullAtomCallback(int32_t atom_tag,AStatsManager_PullAtomMetadata * metadata,AStatsManager_PullAtomCallback callback,void * cookie)397 void AStatsManager_setPullAtomCallback(int32_t atom_tag, AStatsManager_PullAtomMetadata* metadata,
398 AStatsManager_PullAtomCallback callback, void* cookie) {
399 int64_t coolDownMillis =
400 metadata == nullptr ? DEFAULT_COOL_DOWN_MILLIS : metadata->cool_down_millis;
401 int64_t timeoutMillis = metadata == nullptr ? DEFAULT_TIMEOUT_MILLIS : metadata->timeout_millis;
402
403 std::vector<int32_t> additiveFields;
404 if (metadata != nullptr) {
405 additiveFields = metadata->additive_fields;
406 }
407
408 std::shared_ptr<StatsPullAtomCallbackInternal> callbackBinder =
409 SharedRefBase::make<StatsPullAtomCallbackInternal>(callback, cookie, coolDownMillis,
410 timeoutMillis, additiveFields);
411
412 {
413 std::lock_guard<std::mutex> lock(pullersMutex);
414 // Always add to the map. If statsd is dead, we will add them when it comes back.
415 pullers[atom_tag] = callbackBinder;
416 }
417
418 CallbackOperationsHandler::getInstance().registerCallback(atom_tag, callbackBinder);
419 }
420
AStatsManager_clearPullAtomCallback(int32_t atom_tag)421 void AStatsManager_clearPullAtomCallback(int32_t atom_tag) {
422 {
423 std::lock_guard<std::mutex> lock(pullersMutex);
424 // Always remove the puller from our map.
425 // If statsd is down, we will not register it when it comes back.
426 pullers.erase(atom_tag);
427 }
428
429 CallbackOperationsHandler::getInstance().unregisterCallback(atom_tag);
430 }
431