1 /*
2  * Copyright (C) 2019, The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <aidl/android/os/BnPullAtomCallback.h>
18 #include <aidl/android/os/IPullAtomResultReceiver.h>
19 #include <aidl/android/os/IStatsd.h>
20 #include <aidl/android/util/StatsEventParcel.h>
21 #include <android/binder_auto_utils.h>
22 #include <android/binder_ibinder.h>
23 #include <android/binder_manager.h>
24 #include <com_android_os_statsd_flags.h>
25 #include <stats_event.h>
26 #include <stats_pull_atom_callback.h>
27 
28 #include <map>
29 #include <queue>
30 #include <thread>
31 #include <vector>
32 
33 using Status = ::ndk::ScopedAStatus;
34 using aidl::android::os::BnPullAtomCallback;
35 using aidl::android::os::IPullAtomResultReceiver;
36 using aidl::android::os::IStatsd;
37 using aidl::android::util::StatsEventParcel;
38 using ::ndk::SharedRefBase;
39 
40 namespace flags = com::android::os::statsd::flags;
41 
42 struct AStatsEventList {
43     std::vector<AStatsEvent*> data;
44 };
45 
AStatsEventList_addStatsEvent(AStatsEventList * pull_data)46 AStatsEvent* AStatsEventList_addStatsEvent(AStatsEventList* pull_data) {
47     AStatsEvent* event = AStatsEvent_obtain();
48     pull_data->data.push_back(event);
49     return event;
50 }
51 
52 constexpr int64_t DEFAULT_COOL_DOWN_MILLIS = 1000LL;  // 1 second.
53 constexpr int64_t DEFAULT_TIMEOUT_MILLIS = 1500LL;    // 1.5 seconds.
54 
55 struct AStatsManager_PullAtomMetadata {
56     int64_t cool_down_millis;
57     int64_t timeout_millis;
58     std::vector<int32_t> additive_fields;
59 };
60 
AStatsManager_PullAtomMetadata_obtain()61 AStatsManager_PullAtomMetadata* AStatsManager_PullAtomMetadata_obtain() {
62     AStatsManager_PullAtomMetadata* metadata = new AStatsManager_PullAtomMetadata();
63     metadata->cool_down_millis = DEFAULT_COOL_DOWN_MILLIS;
64     metadata->timeout_millis = DEFAULT_TIMEOUT_MILLIS;
65     metadata->additive_fields = std::vector<int32_t>();
66     return metadata;
67 }
68 
AStatsManager_PullAtomMetadata_release(AStatsManager_PullAtomMetadata * metadata)69 void AStatsManager_PullAtomMetadata_release(AStatsManager_PullAtomMetadata* metadata) {
70     delete metadata;
71 }
72 
AStatsManager_PullAtomMetadata_setCoolDownMillis(AStatsManager_PullAtomMetadata * metadata,int64_t cool_down_millis)73 void AStatsManager_PullAtomMetadata_setCoolDownMillis(AStatsManager_PullAtomMetadata* metadata,
74                                                       int64_t cool_down_millis) {
75     metadata->cool_down_millis = cool_down_millis;
76 }
77 
AStatsManager_PullAtomMetadata_getCoolDownMillis(AStatsManager_PullAtomMetadata * metadata)78 int64_t AStatsManager_PullAtomMetadata_getCoolDownMillis(AStatsManager_PullAtomMetadata* metadata) {
79     return metadata->cool_down_millis;
80 }
81 
AStatsManager_PullAtomMetadata_setTimeoutMillis(AStatsManager_PullAtomMetadata * metadata,int64_t timeout_millis)82 void AStatsManager_PullAtomMetadata_setTimeoutMillis(AStatsManager_PullAtomMetadata* metadata,
83                                                      int64_t timeout_millis) {
84     metadata->timeout_millis = timeout_millis;
85 }
86 
AStatsManager_PullAtomMetadata_getTimeoutMillis(AStatsManager_PullAtomMetadata * metadata)87 int64_t AStatsManager_PullAtomMetadata_getTimeoutMillis(AStatsManager_PullAtomMetadata* metadata) {
88     return metadata->timeout_millis;
89 }
90 
AStatsManager_PullAtomMetadata_setAdditiveFields(AStatsManager_PullAtomMetadata * metadata,int32_t * additive_fields,int32_t num_fields)91 void AStatsManager_PullAtomMetadata_setAdditiveFields(AStatsManager_PullAtomMetadata* metadata,
92                                                       int32_t* additive_fields,
93                                                       int32_t num_fields) {
94     metadata->additive_fields.assign(additive_fields, additive_fields + num_fields);
95 }
96 
AStatsManager_PullAtomMetadata_getNumAdditiveFields(AStatsManager_PullAtomMetadata * metadata)97 int32_t AStatsManager_PullAtomMetadata_getNumAdditiveFields(
98         AStatsManager_PullAtomMetadata* metadata) {
99     return metadata->additive_fields.size();
100 }
101 
AStatsManager_PullAtomMetadata_getAdditiveFields(AStatsManager_PullAtomMetadata * metadata,int32_t * fields)102 void AStatsManager_PullAtomMetadata_getAdditiveFields(AStatsManager_PullAtomMetadata* metadata,
103                                                       int32_t* fields) {
104     std::copy(metadata->additive_fields.begin(), metadata->additive_fields.end(), fields);
105 }
106 
107 class StatsPullAtomCallbackInternal : public BnPullAtomCallback {
108   public:
StatsPullAtomCallbackInternal(const AStatsManager_PullAtomCallback callback,void * cookie,const int64_t coolDownMillis,const int64_t timeoutMillis,const std::vector<int32_t> additiveFields)109     StatsPullAtomCallbackInternal(const AStatsManager_PullAtomCallback callback, void* cookie,
110                                   const int64_t coolDownMillis, const int64_t timeoutMillis,
111                                   const std::vector<int32_t> additiveFields)
112         : mCallback(callback),
113           mCookie(cookie),
114           mCoolDownMillis(coolDownMillis),
115           mTimeoutMillis(timeoutMillis),
116           mAdditiveFields(additiveFields) {}
117 
onPullAtom(int32_t atomTag,const std::shared_ptr<IPullAtomResultReceiver> & resultReceiver)118     Status onPullAtom(int32_t atomTag,
119                       const std::shared_ptr<IPullAtomResultReceiver>& resultReceiver) override {
120         AStatsEventList statsEventList;
121         int successInt = mCallback(atomTag, &statsEventList, mCookie);
122         bool success = successInt == AStatsManager_PULL_SUCCESS;
123 
124         // Convert stats_events into StatsEventParcels.
125         std::vector<StatsEventParcel> parcels;
126 
127         for (int i = 0; i < statsEventList.data.size(); i++) {
128             size_t size;
129             uint8_t* buffer = AStatsEvent_getBuffer(statsEventList.data[i], &size);
130 
131             StatsEventParcel p;
132             // vector.assign() creates a copy, but this is inevitable unless
133             // stats_event.h/c uses a vector as opposed to a buffer.
134             p.buffer.assign(buffer, buffer + size);
135             parcels.push_back(std::move(p));
136         }
137 
138         Status status = resultReceiver->pullFinished(atomTag, success, parcels);
139         if (!status.isOk()) {
140             std::vector<StatsEventParcel> emptyParcels;
141             resultReceiver->pullFinished(atomTag, /*success=*/false, emptyParcels);
142         }
143         for (int i = 0; i < statsEventList.data.size(); i++) {
144             AStatsEvent_release(statsEventList.data[i]);
145         }
146         return Status::ok();
147     }
148 
getCoolDownMillis() const149     int64_t getCoolDownMillis() const { return mCoolDownMillis; }
getTimeoutMillis() const150     int64_t getTimeoutMillis() const { return mTimeoutMillis; }
getAdditiveFields() const151     const std::vector<int32_t>& getAdditiveFields() const { return mAdditiveFields; }
152 
153   private:
154     const AStatsManager_PullAtomCallback mCallback;
155     void* mCookie;
156     const int64_t mCoolDownMillis;
157     const int64_t mTimeoutMillis;
158     const std::vector<int32_t> mAdditiveFields;
159 };
160 
161 /**
162  * @brief pullersMutex is used to guard simultaneous access to pullers from below threads
163  * Main thread
164  * - AStatsManager_setPullAtomCallback()
165  * - AStatsManager_clearPullAtomCallback()
166  * Binder thread:
167  * - StatsdProvider::binderDied()
168  */
169 static std::mutex pullersMutex;
170 
171 static std::map<int32_t, std::shared_ptr<StatsPullAtomCallbackInternal>> pullers;
172 
173 class StatsdProvider {
174 public:
StatsdProvider()175     StatsdProvider() : mDeathRecipient(AIBinder_DeathRecipient_new(binderDied)) {
176     }
177 
~StatsdProvider()178     ~StatsdProvider() {
179         resetStatsService();
180     }
181 
getStatsService()182     std::shared_ptr<IStatsd> getStatsService() {
183         // There are host unit tests which are using libstatspull
184         // Since we do not have statsd on host - the getStatsService() is no-op and
185         // should return nullptr
186 #ifdef __ANDROID__
187         std::lock_guard<std::mutex> lock(mStatsdMutex);
188         if (!mStatsd) {
189             // Fetch statsd
190 
191             ::ndk::SpAIBinder binder;
192             // below ifs cannot be combined into single statement due to the way how
193             // macro __builtin_available is handler by compiler:
194             // - it should be used explicitly & independently to guard the corresponding API call
195             // once use_wait_for_service_api flag will be finalized, external if/else pair will be
196             // removed
197             if (flags::use_wait_for_service_api()) {
198                 if (__builtin_available(android __ANDROID_API_S__, *)) {
199                     binder.set(AServiceManager_waitForService("stats"));
200                 } else {
201                     binder.set(AServiceManager_getService("stats"));
202                 }
203             } else {
204                 binder.set(AServiceManager_getService("stats"));
205             }
206             mStatsd = IStatsd::fromBinder(binder);
207             if (mStatsd) {
208                 AIBinder_linkToDeath(binder.get(), mDeathRecipient.get(), this);
209             }
210         }
211 #endif  //  __ANDROID__
212         return mStatsd;
213     }
214 
resetStatsService()215     void resetStatsService() {
216         std::lock_guard<std::mutex> lock(mStatsdMutex);
217         mStatsd = nullptr;
218     }
219 
binderDied(void * cookie)220     static void binderDied(void* cookie) {
221         StatsdProvider* statsProvider = static_cast<StatsdProvider*>(cookie);
222         statsProvider->resetStatsService();
223 
224         std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
225         if (statsService == nullptr) {
226             return;
227         }
228 
229         // Since we do not want to make an IPC with the lock held, we first create a
230         // copy of the data with the lock held before iterating through the map.
231         std::map<int32_t, std::shared_ptr<StatsPullAtomCallbackInternal>> pullersCopy;
232         {
233             std::lock_guard<std::mutex> lock(pullersMutex);
234             pullersCopy = pullers;
235         }
236         for (const auto& it : pullersCopy) {
237             statsService->registerNativePullAtomCallback(it.first, it.second->getCoolDownMillis(),
238                                                          it.second->getTimeoutMillis(),
239                                                          it.second->getAdditiveFields(), it.second);
240         }
241     }
242 
243 private:
244     /**
245      * @brief mStatsdMutex is used to guard simultaneous access to mStatsd from below threads:
246      * Work thread
247      * - registerStatsPullAtomCallbackBlocking()
248      * - unregisterStatsPullAtomCallbackBlocking()
249      * Binder thread:
250      * - StatsdProvider::binderDied()
251      */
252     std::mutex mStatsdMutex;
253     std::shared_ptr<IStatsd> mStatsd;
254     ::ndk::ScopedAIBinder_DeathRecipient mDeathRecipient;
255 };
256 
257 static std::shared_ptr<StatsdProvider> statsProvider = std::make_shared<StatsdProvider>();
258 
registerStatsPullAtomCallbackBlocking(int32_t atomTag,std::shared_ptr<StatsdProvider> statsProvider,std::shared_ptr<StatsPullAtomCallbackInternal> cb)259 void registerStatsPullAtomCallbackBlocking(int32_t atomTag,
260                                            std::shared_ptr<StatsdProvider> statsProvider,
261                                            std::shared_ptr<StatsPullAtomCallbackInternal> cb) {
262     const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
263     if (statsService == nullptr) {
264         // Statsd not available
265         return;
266     }
267 
268     statsService->registerNativePullAtomCallback(
269             atomTag, cb->getCoolDownMillis(), cb->getTimeoutMillis(), cb->getAdditiveFields(), cb);
270 }
271 
unregisterStatsPullAtomCallbackBlocking(int32_t atomTag,std::shared_ptr<StatsdProvider> statsProvider)272 void unregisterStatsPullAtomCallbackBlocking(int32_t atomTag,
273                                              std::shared_ptr<StatsdProvider> statsProvider) {
274     const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
275     if (statsService == nullptr) {
276         // Statsd not available
277         return;
278     }
279 
280     statsService->unregisterNativePullAtomCallback(atomTag);
281 }
282 
283 class CallbackOperationsHandler {
284     struct Cmd {
285         enum Type { CMD_REGISTER, CMD_UNREGISTER };
286 
287         Type type;
288         int atomTag;
289         std::shared_ptr<StatsPullAtomCallbackInternal> callback;
290     };
291 
292 public:
~CallbackOperationsHandler()293     ~CallbackOperationsHandler() {
294         if (mWorkThread.joinable()) {
295             mWorkThread.join();
296         }
297     }
298 
getInstance()299     static CallbackOperationsHandler& getInstance() {
300         static CallbackOperationsHandler handler;
301         return handler;
302     }
303 
registerCallback(int atomTag,std::shared_ptr<StatsPullAtomCallbackInternal> callback)304     void registerCallback(int atomTag, std::shared_ptr<StatsPullAtomCallbackInternal> callback) {
305         auto registerCmd = std::make_unique<Cmd>();
306         registerCmd->type = Cmd::CMD_REGISTER;
307         registerCmd->atomTag = atomTag;
308         registerCmd->callback = std::move(callback);
309         pushToQueue(std::move(registerCmd));
310 
311         startWorkerThread();
312     }
313 
unregisterCallback(int atomTag)314     void unregisterCallback(int atomTag) {
315         auto unregisterCmd = std::make_unique<Cmd>();
316         unregisterCmd->type = Cmd::CMD_UNREGISTER;
317         unregisterCmd->atomTag = atomTag;
318         pushToQueue(std::move(unregisterCmd));
319 
320         startWorkerThread();
321     }
322 
323 private:
324     std::atomic_bool mThreadAlive = false;
325     std::thread mWorkThread;
326 
327     std::mutex mMutex;
328     std::queue<std::unique_ptr<Cmd>> mCmdQueue;
329 
CallbackOperationsHandler()330     CallbackOperationsHandler() {
331     }
332 
pushToQueue(std::unique_ptr<Cmd> cmd)333     void pushToQueue(std::unique_ptr<Cmd> cmd) {
334         std::unique_lock<std::mutex> lock(mMutex);
335         mCmdQueue.push(std::move(cmd));
336     }
337 
startWorkerThread()338     void startWorkerThread() {
339         // Only spawn one thread to manage requests
340         if (mThreadAlive) {
341             return;
342         }
343         mThreadAlive = true;
344         if (mWorkThread.joinable()) {
345             mWorkThread.join();
346         }
347         mWorkThread = std::thread(&CallbackOperationsHandler::processCommands, this, statsProvider);
348     }
349 
processCommands(std::shared_ptr<StatsdProvider> statsProvider)350     void processCommands(std::shared_ptr<StatsdProvider> statsProvider) {
351         /**
352          * First trying to obtain stats service instance
353          * This is a blocking call, which waits on service readiness
354          */
355         const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
356 
357         if (!statsService) {
358             // Statsd not available - dropping all submitted command requests
359             std::queue<std::unique_ptr<Cmd>> emptyQueue;
360             std::unique_lock<std::mutex> lock(mMutex);
361             mCmdQueue.swap(emptyQueue);
362             mThreadAlive = false;
363             return;
364         }
365 
366         while (true) {
367             std::unique_ptr<Cmd> cmd = nullptr;
368             {
369                 /**
370                  * To guarantee sequential commands processing we need to lock mutex queue
371                  */
372                 std::unique_lock<std::mutex> lock(mMutex);
373                 if (mCmdQueue.empty()) {
374                     mThreadAlive = false;
375                     return;
376                 }
377 
378                 cmd = std::move(mCmdQueue.front());
379                 mCmdQueue.pop();
380             }
381 
382             switch (cmd->type) {
383                 case Cmd::CMD_REGISTER: {
384                     registerStatsPullAtomCallbackBlocking(cmd->atomTag, statsProvider,
385                                                           cmd->callback);
386                     break;
387                 }
388                 case Cmd::CMD_UNREGISTER: {
389                     unregisterStatsPullAtomCallbackBlocking(cmd->atomTag, statsProvider);
390                     break;
391                 }
392             }
393         }
394     }
395 };
396 
AStatsManager_setPullAtomCallback(int32_t atom_tag,AStatsManager_PullAtomMetadata * metadata,AStatsManager_PullAtomCallback callback,void * cookie)397 void AStatsManager_setPullAtomCallback(int32_t atom_tag, AStatsManager_PullAtomMetadata* metadata,
398                                        AStatsManager_PullAtomCallback callback, void* cookie) {
399     int64_t coolDownMillis =
400             metadata == nullptr ? DEFAULT_COOL_DOWN_MILLIS : metadata->cool_down_millis;
401     int64_t timeoutMillis = metadata == nullptr ? DEFAULT_TIMEOUT_MILLIS : metadata->timeout_millis;
402 
403     std::vector<int32_t> additiveFields;
404     if (metadata != nullptr) {
405         additiveFields = metadata->additive_fields;
406     }
407 
408     std::shared_ptr<StatsPullAtomCallbackInternal> callbackBinder =
409             SharedRefBase::make<StatsPullAtomCallbackInternal>(callback, cookie, coolDownMillis,
410                                                                timeoutMillis, additiveFields);
411 
412     {
413         std::lock_guard<std::mutex> lock(pullersMutex);
414         // Always add to the map. If statsd is dead, we will add them when it comes back.
415         pullers[atom_tag] = callbackBinder;
416     }
417 
418     CallbackOperationsHandler::getInstance().registerCallback(atom_tag, callbackBinder);
419 }
420 
AStatsManager_clearPullAtomCallback(int32_t atom_tag)421 void AStatsManager_clearPullAtomCallback(int32_t atom_tag) {
422     {
423         std::lock_guard<std::mutex> lock(pullersMutex);
424         // Always remove the puller from our map.
425         // If statsd is down, we will not register it when it comes back.
426         pullers.erase(atom_tag);
427     }
428 
429     CallbackOperationsHandler::getInstance().unregisterCallback(atom_tag);
430 }
431