1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17 #define STATSD_DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "GaugeMetricProducer.h"
21 
22 #include "guardrail/StatsdStats.h"
23 #include "metrics/parsing_utils/metrics_manager_util.h"
24 #include "stats_log_util.h"
25 
26 using android::util::FIELD_COUNT_REPEATED;
27 using android::util::FIELD_TYPE_BOOL;
28 using android::util::FIELD_TYPE_FLOAT;
29 using android::util::FIELD_TYPE_INT32;
30 using android::util::FIELD_TYPE_INT64;
31 using android::util::FIELD_TYPE_MESSAGE;
32 using android::util::FIELD_TYPE_STRING;
33 using android::util::ProtoOutputStream;
34 using std::map;
35 using std::string;
36 using std::unordered_map;
37 using std::vector;
38 using std::make_shared;
39 using std::shared_ptr;
40 
41 namespace android {
42 namespace os {
43 namespace statsd {
44 
45 // for StatsLogReport
46 const int FIELD_ID_ID = 1;
47 const int FIELD_ID_GAUGE_METRICS = 8;
48 const int FIELD_ID_TIME_BASE = 9;
49 const int FIELD_ID_BUCKET_SIZE = 10;
50 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
51 const int FIELD_ID_IS_ACTIVE = 14;
52 const int FIELD_ID_DIMENSION_GUARDRAIL_HIT = 17;
53 const int FIELD_ID_ESTIMATED_MEMORY_BYTES = 18;
54 const int FIELD_ID_DATA_CORRUPTED_REASON = 19;
55 // for GaugeMetricDataWrapper
56 const int FIELD_ID_DATA = 1;
57 const int FIELD_ID_SKIPPED = 2;
58 // for SkippedBuckets
59 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
60 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
61 const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
62 // for DumpEvent Proto
63 const int FIELD_ID_BUCKET_DROP_REASON = 1;
64 const int FIELD_ID_DROP_TIME = 2;
65 // for GaugeMetricData
66 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
67 const int FIELD_ID_BUCKET_INFO = 3;
68 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
69 // for GaugeBucketInfo
70 const int FIELD_ID_BUCKET_NUM = 6;
71 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 7;
72 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 8;
73 const int FIELD_ID_AGGREGATED_ATOM = 9;
74 // for AggregatedAtomInfo
75 const int FIELD_ID_ATOM_VALUE = 1;
76 const int FIELD_ID_ATOM_TIMESTAMPS = 2;
77 
GaugeMetricProducer(const ConfigKey & key,const GaugeMetric & metric,const int conditionIndex,const vector<ConditionState> & initialConditionCache,const sp<ConditionWizard> & wizard,const uint64_t protoHash,const int whatMatcherIndex,const sp<EventMatcherWizard> & matcherWizard,const int pullTagId,const int triggerAtomId,const int atomId,const int64_t timeBaseNs,const int64_t startTimeNs,const sp<StatsPullerManager> & pullerManager,const wp<ConfigMetadataProvider> configMetadataProvider,const unordered_map<int,shared_ptr<Activation>> & eventActivationMap,const unordered_map<int,vector<shared_ptr<Activation>>> & eventDeactivationMap,const size_t dimensionSoftLimit,const size_t dimensionHardLimit)78 GaugeMetricProducer::GaugeMetricProducer(
79         const ConfigKey& key, const GaugeMetric& metric, const int conditionIndex,
80         const vector<ConditionState>& initialConditionCache, const sp<ConditionWizard>& wizard,
81         const uint64_t protoHash, const int whatMatcherIndex,
82         const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int triggerAtomId,
83         const int atomId, const int64_t timeBaseNs, const int64_t startTimeNs,
84         const sp<StatsPullerManager>& pullerManager,
85         const wp<ConfigMetadataProvider> configMetadataProvider,
86         const unordered_map<int, shared_ptr<Activation>>& eventActivationMap,
87         const unordered_map<int, vector<shared_ptr<Activation>>>& eventDeactivationMap,
88         const size_t dimensionSoftLimit, const size_t dimensionHardLimit)
89     : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, initialConditionCache, wizard,
90                      protoHash, eventActivationMap, eventDeactivationMap, /*slicedStateAtoms=*/{},
91                      /*stateGroupMap=*/{}, getAppUpgradeBucketSplit(metric),
92                      configMetadataProvider),
93       mWhatMatcherIndex(whatMatcherIndex),
94       mEventMatcherWizard(matcherWizard),
95       mPullerManager(pullerManager),
96       mPullTagId(pullTagId),
97       mTriggerAtomId(triggerAtomId),
98       mAtomId(atomId),
99       mIsPulled(pullTagId != -1),
100       mMinBucketSizeNs(metric.min_bucket_size_nanos()),
101       mSamplingType(metric.sampling_type()),
102       mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
103                                                       : StatsdStats::kPullMaxDelayNs),
104       mDimensionSoftLimit(dimensionSoftLimit),
105       mDimensionHardLimit(dimensionHardLimit),
106       mGaugeAtomsPerDimensionLimit(metric.max_num_gauge_atoms_per_bucket()),
107       mDimensionGuardrailHit(false),
108       mSamplingPercentage(metric.sampling_percentage()),
109       mPullProbability(metric.pull_probability()) {
110     mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
111     mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
112     int64_t bucketSizeMills = 0;
113     if (metric.has_bucket()) {
114         bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
115     } else {
116         bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
117     }
118     mBucketSizeNs = bucketSizeMills * 1000000;
119 
120     if (!metric.gauge_fields_filter().include_all()) {
121         translateFieldMatcher(metric.gauge_fields_filter().fields(), &mFieldMatchers);
122     }
123 
124     if (metric.has_dimensions_in_what()) {
125         translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
126         mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
127     }
128 
129     if (metric.links().size() > 0) {
130         for (const auto& link : metric.links()) {
131             Metric2Condition mc;
132             mc.conditionId = link.condition();
133             translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
134             translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
135             mMetric2ConditionLinks.push_back(mc);
136         }
137         mConditionSliced = true;
138     }
139     mShouldUseNestedDimensions = ShouldUseNestedDimensions(metric.dimensions_in_what());
140 
141     flushIfNeededLocked(startTimeNs);
142     // Kicks off the puller immediately.
143     if (mIsPulled && isRandomNSamples()) {
144         mPullerManager->RegisterReceiver(mPullTagId, mConfigKey, this, getCurrentBucketEndTimeNs(),
145                                          mBucketSizeNs);
146     }
147 
148     // Adjust start for partial first bucket and then pull if needed
149     mCurrentBucketStartTimeNs = startTimeNs;
150 
151     VLOG("Gauge metric %lld created. bucket size %lld start_time: %lld sliced %d",
152          (long long)mMetricId, (long long)mBucketSizeNs, (long long)mTimeBaseNs, mConditionSliced);
153 }
154 
~GaugeMetricProducer()155 GaugeMetricProducer::~GaugeMetricProducer() {
156     VLOG("~GaugeMetricProducer() called");
157     if (mIsPulled && isRandomNSamples()) {
158         mPullerManager->UnRegisterReceiver(mPullTagId, mConfigKey, this);
159     }
160 }
161 
onConfigUpdatedLocked(const StatsdConfig & config,const int configIndex,const int metricIndex,const vector<sp<AtomMatchingTracker>> & allAtomMatchingTrackers,const unordered_map<int64_t,int> & oldAtomMatchingTrackerMap,const unordered_map<int64_t,int> & newAtomMatchingTrackerMap,const sp<EventMatcherWizard> & matcherWizard,const vector<sp<ConditionTracker>> & allConditionTrackers,const unordered_map<int64_t,int> & conditionTrackerMap,const sp<ConditionWizard> & wizard,const unordered_map<int64_t,int> & metricToActivationMap,unordered_map<int,vector<int>> & trackerToMetricMap,unordered_map<int,vector<int>> & conditionToMetricMap,unordered_map<int,vector<int>> & activationAtomTrackerToMetricMap,unordered_map<int,vector<int>> & deactivationAtomTrackerToMetricMap,vector<int> & metricsWithActivation)162 optional<InvalidConfigReason> GaugeMetricProducer::onConfigUpdatedLocked(
163         const StatsdConfig& config, const int configIndex, const int metricIndex,
164         const vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers,
165         const unordered_map<int64_t, int>& oldAtomMatchingTrackerMap,
166         const unordered_map<int64_t, int>& newAtomMatchingTrackerMap,
167         const sp<EventMatcherWizard>& matcherWizard,
168         const vector<sp<ConditionTracker>>& allConditionTrackers,
169         const unordered_map<int64_t, int>& conditionTrackerMap, const sp<ConditionWizard>& wizard,
170         const unordered_map<int64_t, int>& metricToActivationMap,
171         unordered_map<int, vector<int>>& trackerToMetricMap,
172         unordered_map<int, vector<int>>& conditionToMetricMap,
173         unordered_map<int, vector<int>>& activationAtomTrackerToMetricMap,
174         unordered_map<int, vector<int>>& deactivationAtomTrackerToMetricMap,
175         vector<int>& metricsWithActivation) {
176     optional<InvalidConfigReason> invalidConfigReason = MetricProducer::onConfigUpdatedLocked(
177             config, configIndex, metricIndex, allAtomMatchingTrackers, oldAtomMatchingTrackerMap,
178             newAtomMatchingTrackerMap, matcherWizard, allConditionTrackers, conditionTrackerMap,
179             wizard, metricToActivationMap, trackerToMetricMap, conditionToMetricMap,
180             activationAtomTrackerToMetricMap, deactivationAtomTrackerToMetricMap,
181             metricsWithActivation);
182     if (invalidConfigReason.has_value()) {
183         return invalidConfigReason;
184     }
185 
186     const GaugeMetric& metric = config.gauge_metric(configIndex);
187     // Update appropriate indices: mWhatMatcherIndex, mConditionIndex and MetricsManager maps.
188     invalidConfigReason = handleMetricWithAtomMatchingTrackers(
189             metric.what(), mMetricId, metricIndex, /*enforceOneAtom=*/false,
190             allAtomMatchingTrackers, newAtomMatchingTrackerMap, trackerToMetricMap,
191             mWhatMatcherIndex);
192     if (invalidConfigReason.has_value()) {
193         return invalidConfigReason;
194     }
195 
196     // Need to update maps since the index changed, but mTriggerAtomId will not change.
197     int triggerTrackerIndex;
198     if (metric.has_trigger_event()) {
199         invalidConfigReason = handleMetricWithAtomMatchingTrackers(
200                 metric.trigger_event(), mMetricId, metricIndex,
201                 /*enforceOneAtom=*/true, allAtomMatchingTrackers, newAtomMatchingTrackerMap,
202                 trackerToMetricMap, triggerTrackerIndex);
203         if (invalidConfigReason.has_value()) {
204             return invalidConfigReason;
205         }
206     }
207 
208     if (metric.has_condition()) {
209         invalidConfigReason = handleMetricWithConditions(
210                 metric.condition(), mMetricId, metricIndex, conditionTrackerMap, metric.links(),
211                 allConditionTrackers, mConditionTrackerIndex, conditionToMetricMap);
212         if (invalidConfigReason.has_value()) {
213             return invalidConfigReason;
214         }
215     }
216     sp<EventMatcherWizard> tmpEventWizard = mEventMatcherWizard;
217     mEventMatcherWizard = matcherWizard;
218 
219     // If this is a config update, we must have just forced a partial bucket. Pull if needed to get
220     // data for the new bucket.
221     if (mCondition == ConditionState::kTrue && mIsActive && mIsPulled && isRandomNSamples()) {
222         pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
223     }
224     return nullopt;
225 }
226 
dumpStatesLocked(int out,bool verbose) const227 void GaugeMetricProducer::dumpStatesLocked(int out, bool verbose) const {
228     if (mCurrentSlicedBucket == nullptr ||
229         mCurrentSlicedBucket->size() == 0) {
230         return;
231     }
232 
233     dprintf(out, "GaugeMetric %lld dimension size %lu\n", (long long)mMetricId,
234             (unsigned long)mCurrentSlicedBucket->size());
235     if (verbose) {
236         for (const auto& it : *mCurrentSlicedBucket) {
237             dprintf(out, "\t(what)%s\t(states)%s  %d atoms\n",
238                     it.first.getDimensionKeyInWhat().toString().c_str(),
239                     it.first.getStateValuesKey().toString().c_str(), (int)it.second.size());
240         }
241     }
242 }
243 
clearPastBucketsLocked(const int64_t dumpTimeNs)244 void GaugeMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
245     flushIfNeededLocked(dumpTimeNs);
246     mPastBuckets.clear();
247     mSkippedBuckets.clear();
248     resetDataCorruptionFlagsLocked();
249     mTotalDataSize = 0;
250 }
251 
onDumpReportLocked(const int64_t dumpTimeNs,const bool include_current_partial_bucket,const bool erase_data,const DumpLatency dumpLatency,std::set<string> * str_set,std::set<int32_t> & usedUids,ProtoOutputStream * protoOutput)252 void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
253                                              const bool include_current_partial_bucket,
254                                              const bool erase_data, const DumpLatency dumpLatency,
255                                              std::set<string>* str_set, std::set<int32_t>& usedUids,
256                                              ProtoOutputStream* protoOutput) {
257     VLOG("Gauge metric %lld report now...", (long long)mMetricId);
258     if (include_current_partial_bucket) {
259         flushLocked(dumpTimeNs);
260     } else {
261         flushIfNeededLocked(dumpTimeNs);
262     }
263 
264     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
265     protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
266 
267     // Data corrupted reason
268     writeDataCorruptedReasons(*protoOutput, FIELD_ID_DATA_CORRUPTED_REASON,
269                               mDataCorruptedDueToQueueOverflow != DataCorruptionSeverity::kNone,
270                               mDataCorruptedDueToSocketLoss != DataCorruptionSeverity::kNone);
271 
272     if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
273         if (erase_data) {
274             resetDataCorruptionFlagsLocked();
275         }
276         return;
277     }
278 
279     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ESTIMATED_MEMORY_BYTES,
280                        (long long)byteSizeLocked());
281 
282     if (mDimensionGuardrailHit) {
283         protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_DIMENSION_GUARDRAIL_HIT,
284                            mDimensionGuardrailHit);
285     }
286 
287     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
288     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
289 
290     // Fills the dimension path if not slicing by a primitive repeated field or position ALL.
291     if (!mShouldUseNestedDimensions) {
292         if (!mDimensionsInWhat.empty()) {
293             uint64_t dimenPathToken = protoOutput->start(
294                     FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
295             writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
296             protoOutput->end(dimenPathToken);
297         }
298     }
299 
300     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
301 
302     for (const auto& skippedBucket : mSkippedBuckets) {
303         uint64_t wrapperToken =
304                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
305         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
306                            (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
307         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
308                            (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
309 
310         for (const auto& dropEvent : skippedBucket.dropEvents) {
311             uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
312                                                          FIELD_ID_SKIPPED_DROP_EVENT);
313             protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
314             protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME, (long long) (NanoToMillis(dropEvent.dropTimeNs)));
315             protoOutput->end(dropEventToken);
316         }
317         protoOutput->end(wrapperToken);
318     }
319 
320     for (const auto& pair : mPastBuckets) {
321         const MetricDimensionKey& dimensionKey = pair.first;
322 
323         VLOG("Gauge dimension key %s", dimensionKey.toString().c_str());
324         uint64_t wrapperToken =
325                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
326 
327         // First fill dimension.
328         if (mShouldUseNestedDimensions) {
329             uint64_t dimensionToken = protoOutput->start(
330                     FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
331             writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), mUidFields, str_set,
332                                   usedUids, protoOutput);
333             protoOutput->end(dimensionToken);
334         } else {
335             writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(),
336                                            FIELD_ID_DIMENSION_LEAF_IN_WHAT, mUidFields, str_set,
337                                            usedUids, protoOutput);
338         }
339 
340         // Then fill bucket_info (GaugeBucketInfo).
341         for (const auto& bucket : pair.second) {
342             uint64_t bucketInfoToken = protoOutput->start(
343                     FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
344 
345             if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
346                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
347                                    (long long)NanoToMillis(bucket.mBucketStartNs));
348                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
349                                    (long long)NanoToMillis(bucket.mBucketEndNs));
350             } else {
351                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
352                                    (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
353             }
354 
355             if (!bucket.mAggregatedAtoms.empty()) {
356                 for (const auto& [atomDimensionKey, elapsedTimestampsNs] :
357                      bucket.mAggregatedAtoms) {
358                     uint64_t aggregatedAtomToken = protoOutput->start(
359                             FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_AGGREGATED_ATOM);
360                     uint64_t atomToken =
361                             protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM_VALUE);
362                     writeFieldValueTreeToStream(mAtomId,
363                                                 atomDimensionKey.getAtomFieldValues().getValues(),
364                                                 mUidFields, usedUids, protoOutput);
365                     protoOutput->end(atomToken);
366                     for (int64_t timestampNs : elapsedTimestampsNs) {
367                         protoOutput->write(
368                                 FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_ATOM_TIMESTAMPS,
369                                 (long long)timestampNs);
370                     }
371                     protoOutput->end(aggregatedAtomToken);
372                 }
373             }
374 
375             protoOutput->end(bucketInfoToken);
376             VLOG("Gauge \t bucket [%lld - %lld] includes %d atoms.",
377                  (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs,
378                  (int)bucket.mAggregatedAtoms.size());
379         }
380         protoOutput->end(wrapperToken);
381     }
382     protoOutput->end(protoToken);
383 
384 
385     if (erase_data) {
386         mPastBuckets.clear();
387         mSkippedBuckets.clear();
388         mDimensionGuardrailHit = false;
389         resetDataCorruptionFlagsLocked();
390         mTotalDataSize = 0;
391     }
392 }
393 
prepareFirstBucketLocked()394 void GaugeMetricProducer::prepareFirstBucketLocked() {
395     if (mCondition == ConditionState::kTrue && mIsActive && mIsPulled && isRandomNSamples()) {
396         pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
397     }
398 }
399 
400 // Only call if mCondition == ConditionState::kTrue && metric is active.
pullAndMatchEventsLocked(const int64_t timestampNs)401 void GaugeMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
402     bool triggerPuller = false;
403     switch(mSamplingType) {
404         // When the metric wants to do random sampling and there is already one gauge atom for the
405         // current bucket, do not do it again.
406         case GaugeMetric::RANDOM_ONE_SAMPLE: {
407             triggerPuller = mCurrentSlicedBucket->empty();
408             break;
409         }
410         case GaugeMetric::CONDITION_CHANGE_TO_TRUE:
411         case GaugeMetric::FIRST_N_SAMPLES: {
412             triggerPuller = true;
413             break;
414         }
415         default:
416             break;
417     }
418     if (!triggerPuller || !shouldKeepRandomSample(mPullProbability)) {
419         return;
420     }
421     vector<std::shared_ptr<LogEvent>> allData;
422     if (!mPullerManager->Pull(mPullTagId, mConfigKey, timestampNs, &allData)) {
423         ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
424         return;
425     }
426     const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
427     StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
428     if (pullDelayNs > mMaxPullDelayNs) {
429         ALOGE("Pull finish too late for atom %d", mPullTagId);
430         StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
431         return;
432     }
433     for (const auto& data : allData) {
434         const auto [matchResult, transformedEvent] =
435                 mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
436         if (matchResult == MatchingState::kMatched) {
437             LogEvent localCopy = transformedEvent == nullptr ? *data : *transformedEvent;
438             localCopy.setElapsedTimestampNs(timestampNs);
439             onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
440         }
441     }
442 }
443 
onActiveStateChangedLocked(const int64_t eventTimeNs,const bool isActive)444 void GaugeMetricProducer::onActiveStateChangedLocked(const int64_t eventTimeNs,
445                                                      const bool isActive) {
446     MetricProducer::onActiveStateChangedLocked(eventTimeNs, isActive);
447 
448     if (ConditionState::kTrue != mCondition) {
449         return;
450     }
451 
452     if (isActive && mIsPulled && isRandomNSamples()) {
453         pullAndMatchEventsLocked(eventTimeNs);
454     }
455 }
456 
onConditionChangedLocked(const bool conditionMet,const int64_t eventTimeNs)457 void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
458                                                    const int64_t eventTimeNs) {
459     VLOG("GaugeMetric %lld onConditionChanged", (long long)mMetricId);
460 
461     mCondition = conditionMet ? ConditionState::kTrue : ConditionState::kFalse;
462     if (!mIsActive) {
463         return;
464     }
465 
466     flushIfNeededLocked(eventTimeNs);
467     if (conditionMet && mIsPulled &&
468         (isRandomNSamples() || mSamplingType == GaugeMetric::CONDITION_CHANGE_TO_TRUE)) {
469         pullAndMatchEventsLocked(eventTimeNs);
470     }  // else: Push mode. No need to proactively pull the gauge data.
471 }
472 
onSlicedConditionMayChangeLocked(bool overallCondition,const int64_t eventTimeNs)473 void GaugeMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
474                                                            const int64_t eventTimeNs) {
475     VLOG("GaugeMetric %lld onSlicedConditionMayChange overall condition %d", (long long)mMetricId,
476          overallCondition);
477     mCondition = overallCondition ? ConditionState::kTrue : ConditionState::kFalse;
478     if (!mIsActive) {
479         return;
480     }
481 
482     flushIfNeededLocked(eventTimeNs);
483     // If the condition is sliced, mCondition is true if any of the dimensions is true. And we will
484     // pull for every dimension.
485     if (overallCondition && mIsPulled && mTriggerAtomId == -1) {
486         pullAndMatchEventsLocked(eventTimeNs);
487     }  // else: Push mode. No need to proactively pull the gauge data.
488 }
489 
getGaugeFields(const LogEvent & event)490 std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const LogEvent& event) {
491     std::shared_ptr<vector<FieldValue>> gaugeFields;
492     if (mFieldMatchers.size() > 0) {
493         gaugeFields = std::make_shared<vector<FieldValue>>();
494         filterGaugeValues(mFieldMatchers, event.getValues(), gaugeFields.get());
495     } else {
496         gaugeFields = std::make_shared<vector<FieldValue>>(event.getValues());
497     }
498     // Trim all dimension fields from output. Dimensions will appear in output report and will
499     // benefit from dictionary encoding. For large pulled atoms, this can give the benefit of
500     // optional repeated field.
501     for (const auto& field : mDimensionsInWhat) {
502         for (auto it = gaugeFields->begin(); it != gaugeFields->end();) {
503             if (it->mField.matches(field)) {
504                 it = gaugeFields->erase(it);
505             } else {
506                 it++;
507             }
508         }
509     }
510     return gaugeFields;
511 }
512 
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,PullResult pullResult,int64_t originalPullTimeNs)513 void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
514                                        PullResult pullResult, int64_t originalPullTimeNs) {
515     std::lock_guard<std::mutex> lock(mMutex);
516     if (pullResult != PullResult::PULL_RESULT_SUCCESS || allData.size() == 0) {
517         return;
518     }
519     const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs;
520     StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
521     if (pullDelayNs > mMaxPullDelayNs) {
522         ALOGE("Pull finish too late for atom %d", mPullTagId);
523         StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
524         return;
525     }
526     for (const auto& data : allData) {
527         const auto [matchResult, transformedEvent] =
528                 mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
529         if (matchResult == MatchingState::kMatched) {
530             onMatchedLogEventLocked(mWhatMatcherIndex,
531                                     transformedEvent == nullptr ? *data : *transformedEvent);
532         }
533     }
534 }
535 
hitGuardRailLocked(const MetricDimensionKey & newKey)536 bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
537     if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
538         return false;
539     }
540     // 1. Report the tuple count if the tuple count > soft limit
541     if (mCurrentSlicedBucket->size() >= mDimensionSoftLimit) {
542         size_t newTupleCount = mCurrentSlicedBucket->size() + 1;
543         StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
544         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
545         if (newTupleCount > mDimensionHardLimit) {
546             if (!mHasHitGuardrail) {
547                 ALOGE("GaugeMetric %lld dropping data for dimension key %s", (long long)mMetricId,
548                       newKey.toString().c_str());
549                 mHasHitGuardrail = true;
550             }
551             mDimensionGuardrailHit = true;
552             StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
553             return true;
554         }
555     }
556 
557     return false;
558 }
559 
onMatchedLogEventInternalLocked(const size_t matcherIndex,const MetricDimensionKey & eventKey,const ConditionKey & conditionKey,bool condition,const LogEvent & event,const map<int,HashableDimensionKey> &)560 void GaugeMetricProducer::onMatchedLogEventInternalLocked(
561         const size_t matcherIndex, const MetricDimensionKey& eventKey,
562         const ConditionKey& conditionKey, bool condition, const LogEvent& event,
563         const map<int, HashableDimensionKey>& /*statePrimaryKeys*/) {
564     if (condition == false) {
565         return;
566     }
567 
568     if (mPullTagId == -1 && mSamplingPercentage < 100 &&
569         !shouldKeepRandomSample(mSamplingPercentage)) {
570         return;
571     }
572 
573     int64_t eventTimeNs = event.GetElapsedTimestampNs();
574     if (eventTimeNs < mCurrentBucketStartTimeNs) {
575         VLOG("Gauge Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
576              (long long)mCurrentBucketStartTimeNs);
577         return;
578     }
579     flushIfNeededLocked(eventTimeNs);
580 
581     if (mTriggerAtomId == event.GetTagId()) {
582         // Both Active state and Condition are true here.
583         // Active state being true is checked in onMatchedLogEventLocked.
584         // Condition being true is checked at the start of this method.
585         pullAndMatchEventsLocked(eventTimeNs);
586         return;
587     }
588 
589     // When gauge metric wants to randomly sample the output atom, we just simply use the first
590     // gauge in the given bucket.
591     if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() &&
592         mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
593         return;
594     }
595     if (hitGuardRailLocked(eventKey)) {
596         return;
597     }
598     if ((*mCurrentSlicedBucket)[eventKey].size() >= mGaugeAtomsPerDimensionLimit) {
599         return;
600     }
601 
602     const int64_t truncatedElapsedTimestampNs = truncateTimestampIfNecessary(event);
603     GaugeAtom gaugeAtom(getGaugeFields(event), truncatedElapsedTimestampNs);
604     (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom);
605     // Anomaly detection on gauge metric only works when there is one numeric
606     // field specified.
607     if (mAnomalyTrackers.size() > 0) {
608         if (gaugeAtom.mFields->size() == 1) {
609             const Value& value = gaugeAtom.mFields->begin()->mValue;
610             long gaugeVal = 0;
611             if (value.getType() == INT) {
612                 gaugeVal = (long)value.int_value;
613             } else if (value.getType() == LONG) {
614                 gaugeVal = value.long_value;
615             }
616             for (auto& tracker : mAnomalyTrackers) {
617                 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId,
618                                                  eventKey, gaugeVal);
619             }
620         }
621     }
622 }
623 
updateCurrentSlicedBucketForAnomaly()624 void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
625     for (const auto& slice : *mCurrentSlicedBucket) {
626         if (slice.second.empty()) {
627             continue;
628         }
629         const Value& value = slice.second.front().mFields->front().mValue;
630         long gaugeVal = 0;
631         if (value.getType() == INT) {
632             gaugeVal = (long)value.int_value;
633         } else if (value.getType() == LONG) {
634             gaugeVal = value.long_value;
635         }
636         (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal;
637     }
638 }
639 
dropDataLocked(const int64_t dropTimeNs)640 void GaugeMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
641     flushIfNeededLocked(dropTimeNs);
642     StatsdStats::getInstance().noteBucketDropped(mMetricId);
643     mPastBuckets.clear();
644     resetDataCorruptionFlagsLocked();
645     mTotalDataSize = 0;
646 }
647 
648 // When a new matched event comes in, we check if event falls into the current
649 // bucket. If not, flush the old counter to past buckets and initialize the new
650 // bucket.
651 // if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
652 // the GaugeMetricProducer while holding the lock.
flushIfNeededLocked(const int64_t eventTimeNs)653 void GaugeMetricProducer::flushIfNeededLocked(const int64_t eventTimeNs) {
654     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
655 
656     if (eventTimeNs < currentBucketEndTimeNs) {
657         VLOG("Gauge eventTime is %lld, less than next bucket start time %lld",
658              (long long)eventTimeNs, (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
659         return;
660     }
661 
662     // Adjusts the bucket start and end times.
663     int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
664     int64_t nextBucketNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
665     flushCurrentBucketLocked(eventTimeNs, nextBucketNs);
666 
667     mCurrentBucketNum += numBucketsForward;
668     VLOG("Gauge metric %lld: new bucket start time: %lld", (long long)mMetricId,
669          (long long)mCurrentBucketStartTimeNs);
670 }
671 
flushCurrentBucketLocked(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)672 void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t eventTimeNs,
673                                                    const int64_t nextBucketStartTimeNs) {
674     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
675     int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
676 
677     GaugeBucket info;
678     info.mBucketStartNs = mCurrentBucketStartTimeNs;
679     info.mBucketEndNs = bucketEndTime;
680 
681     // Add bucket to mPastBuckets if bucket is large enough.
682     // Otherwise, drop the bucket data and add bucket metadata to mSkippedBuckets.
683     bool isBucketLargeEnough = info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
684     if (isBucketLargeEnough) {
685         for (const auto& slice : *mCurrentSlicedBucket) {
686             info.mAggregatedAtoms.clear();
687             for (const GaugeAtom& atom : slice.second) {
688                 AtomDimensionKey key(mAtomId, HashableDimensionKey(*atom.mFields));
689                 vector<int64_t>& elapsedTimestampsNs = info.mAggregatedAtoms[key];
690                 elapsedTimestampsNs.push_back(atom.mElapsedTimestampNs);
691             }
692             auto& bucketList = mPastBuckets[slice.first];
693             const bool isFirstBucket = bucketList.empty();
694             bucketList.push_back(info);
695             mTotalDataSize += computeGaugeBucketSizeLocked(eventTimeNs >= fullBucketEndTimeNs,
696                                                            /*dimKey=*/slice.first, isFirstBucket,
697                                                            info.mAggregatedAtoms);
698             VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId,
699                  slice.first.toString().c_str());
700         }
701     } else if (mIsActive) {
702         mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
703         mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime;
704         if (!maxDropEventsReached()) {
705             mCurrentSkippedBucket.dropEvents.emplace_back(
706                     buildDropEvent(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL));
707         }
708         mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
709         mTotalDataSize += computeSkippedBucketSizeLocked(mCurrentSkippedBucket);
710     }
711 
712     // If we have anomaly trackers, we need to update the partial bucket values.
713     if (mAnomalyTrackers.size() > 0) {
714         updateCurrentSlicedBucketForAnomaly();
715 
716         if (eventTimeNs > fullBucketEndTimeNs) {
717             // This is known to be a full bucket, so send this data to the anomaly tracker.
718             for (auto& tracker : mAnomalyTrackers) {
719                 tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum);
720             }
721             mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
722         }
723     }
724 
725     StatsdStats::getInstance().noteBucketCount(mMetricId);
726     mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
727     mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
728     mCurrentSkippedBucket.reset();
729     // Reset mHasHitGuardrail boolean since bucket was reset
730     mHasHitGuardrail = false;
731 }
732 
733 // Estimate for the size of a GaugeBucket.
computeGaugeBucketSizeLocked(const bool isFullBucket,const MetricDimensionKey & dimKey,const bool isFirstBucket,const std::unordered_map<AtomDimensionKey,std::vector<int64_t>> & aggregatedAtoms) const734 size_t GaugeMetricProducer::computeGaugeBucketSizeLocked(
735         const bool isFullBucket, const MetricDimensionKey& dimKey, const bool isFirstBucket,
736         const std::unordered_map<AtomDimensionKey, std::vector<int64_t>>& aggregatedAtoms) const {
737     size_t bucketSize =
738             MetricProducer::computeBucketSizeLocked(isFullBucket, dimKey, isFirstBucket);
739 
740     // Gauge Atoms and timestamps
741     for (const auto& pair : aggregatedAtoms) {
742         bucketSize += getFieldValuesSizeV2(pair.first.getAtomFieldValues().getValues());
743         bucketSize += sizeof(int64_t) * pair.second.size();
744     }
745 
746     return bucketSize;
747 }
748 
byteSizeLocked() const749 size_t GaugeMetricProducer::byteSizeLocked() const {
750     sp<ConfigMetadataProvider> configMetadataProvider = getConfigMetadataProvider();
751     if (configMetadataProvider != nullptr && configMetadataProvider->useV2SoftMemoryCalculation()) {
752         return computeOverheadSizeLocked(!mPastBuckets.empty() || !mSkippedBuckets.empty(),
753                                          mDimensionGuardrailHit) +
754                mTotalDataSize;
755     }
756     size_t totalSize = 0;
757     for (const auto& pair : mPastBuckets) {
758         for (const auto& bucket : pair.second) {
759             for (const auto& [atomDimensionKey, elapsedTimestampsNs] : bucket.mAggregatedAtoms) {
760                 totalSize += sizeof(FieldValue) *
761                              atomDimensionKey.getAtomFieldValues().getValues().size();
762                 totalSize += sizeof(int64_t) * elapsedTimestampsNs.size();
763             }
764         }
765     }
766     return totalSize;
767 }
768 
determineCorruptionSeverity(int32_t atomId,DataCorruptedReason reason,LostAtomType atomType) const769 MetricProducer::DataCorruptionSeverity GaugeMetricProducer::determineCorruptionSeverity(
770         int32_t atomId, DataCorruptedReason reason, LostAtomType atomType) const {
771     switch (atomType) {
772         case LostAtomType::kWhat:
773             return DataCorruptionSeverity::kResetOnDump;
774         case LostAtomType::kCondition:
775             return DataCorruptionSeverity::kUnrecoverable;
776         case LostAtomType::kState:
777             break;
778     };
779     return DataCorruptionSeverity::kNone;
780 };
781 
782 }  // namespace statsd
783 }  // namespace os
784 }  // namespace android
785