1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define STATSD_DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "NumericValueMetricProducer.h"
21 
22 #include <stdlib.h>
23 
24 #include <algorithm>
25 
26 #include "FieldValue.h"
27 #include "guardrail/StatsdStats.h"
28 #include "metrics/HistogramValue.h"
29 #include "metrics/NumericValue.h"
30 #include "metrics/parsing_utils/metrics_manager_util.h"
31 #include "stats_log_util.h"
32 
33 using android::util::FIELD_COUNT_REPEATED;
34 using android::util::FIELD_TYPE_BOOL;
35 using android::util::FIELD_TYPE_DOUBLE;
36 using android::util::FIELD_TYPE_INT32;
37 using android::util::FIELD_TYPE_INT64;
38 using android::util::FIELD_TYPE_MESSAGE;
39 using android::util::FIELD_TYPE_STRING;
40 using android::util::ProtoOutputStream;
41 using std::shared_ptr;
42 using std::string;
43 using std::unordered_map;
44 
45 namespace android {
46 namespace os {
47 namespace statsd {
48 
49 namespace {  // anonymous namespace
50 // for StatsLogReport
51 const int FIELD_ID_VALUE_METRICS = 7;
52 // for ValueBucketInfo
53 const int FIELD_ID_VALUE_INDEX = 1;
54 const int FIELD_ID_VALUE_LONG = 2;
55 const int FIELD_ID_VALUE_DOUBLE = 3;
56 const int FIELD_ID_VALUE_HISTOGRAM = 5;
57 const int FIELD_ID_VALUE_SAMPLESIZE = 4;
58 const int FIELD_ID_VALUES = 9;
59 const int FIELD_ID_BUCKET_NUM = 4;
60 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
61 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
62 const int FIELD_ID_CONDITION_TRUE_NS = 10;
63 const int FIELD_ID_CONDITION_CORRECTION_NS = 11;
64 
65 const NumericValue ZERO_LONG((int64_t)0);
66 const NumericValue ZERO_DOUBLE((double)0);
67 
toDouble(const NumericValue & value)68 double toDouble(const NumericValue& value) {
69     return value.is<int64_t>() ? value.getValue<int64_t>() : value.getValueOrDefault<double>(0);
70 }
71 
72 }  // anonymous namespace
73 
74 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
NumericValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions,const wp<ConfigMetadataProvider> configMetadataProvider)75 NumericValueMetricProducer::NumericValueMetricProducer(
76         const ConfigKey& key, const ValueMetric& metric, const uint64_t protoHash,
77         const PullOptions& pullOptions, const BucketOptions& bucketOptions,
78         const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
79         const StateOptions& stateOptions, const ActivationOptions& activationOptions,
80         const GuardrailOptions& guardrailOptions,
81         const wp<ConfigMetadataProvider> configMetadataProvider)
82     : ValueMetricProducer(metric.id(), key, protoHash, pullOptions, bucketOptions, whatOptions,
83                           conditionOptions, stateOptions, activationOptions, guardrailOptions,
84                           configMetadataProvider),
85       mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
86       mAggregationTypes(whatOptions.aggregationTypes),
87       mIncludeSampleSize(metric.has_include_sample_size()
88                                  ? metric.include_sample_size()
89                                  : hasAvgAggregationType(whatOptions.aggregationTypes)),
90       mUseDiff(metric.has_use_diff() ? metric.use_diff() : isPulled()),
91       mValueDirection(metric.value_direction()),
92       mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
93       mUseZeroDefaultBase(metric.use_zero_default_base()),
94       mHasGlobalBase(false),
95       mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC
96                                                       : StatsdStats::kPullMaxDelayNs),
97       mDedupedFieldMatchers(dedupFieldMatchers(whatOptions.fieldMatchers)),
98       mBinStartsList(whatOptions.binStartsList) {
99     // TODO(b/186677791): Use initializer list to initialize mUploadThreshold.
100     if (metric.has_threshold()) {
101         mUploadThreshold = metric.threshold();
102     }
103 }
104 
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)105 void NumericValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
106                                                          const BucketDropReason reason) {
107     ValueMetricProducer::invalidateCurrentBucket(dropTimeNs, reason);
108 
109     switch (reason) {
110         case BucketDropReason::DUMP_REPORT_REQUESTED:
111         case BucketDropReason::EVENT_IN_WRONG_BUCKET:
112         case BucketDropReason::CONDITION_UNKNOWN:
113         case BucketDropReason::PULL_FAILED:
114         case BucketDropReason::PULL_DELAYED:
115         case BucketDropReason::DIMENSION_GUARDRAIL_REACHED:
116             resetBase();
117             break;
118         default:
119             break;
120     }
121 }
122 
resetBase()123 void NumericValueMetricProducer::resetBase() {
124     for (auto& [_, dimInfo] : mDimInfos) {
125         for (NumericValue& base : dimInfo.dimExtras) {
126             base.reset();
127         }
128     }
129     mHasGlobalBase = false;
130 }
131 
writePastBucketAggregateToProto(const int aggIndex,const NumericValue & value,const int sampleSize,ProtoOutputStream * const protoOutput) const132 void NumericValueMetricProducer::writePastBucketAggregateToProto(
133         const int aggIndex, const NumericValue& value, const int sampleSize,
134         ProtoOutputStream* const protoOutput) const {
135     uint64_t valueToken =
136             protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
137     protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, aggIndex);
138     if (mIncludeSampleSize) {
139         protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_SAMPLESIZE, sampleSize);
140     }
141     if (value.is<int64_t>()) {
142         const int64_t val = value.getValue<int64_t>();
143         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)val);
144         VLOG("\t\t value %d: %lld", aggIndex, (long long)val);
145     } else if (value.is<double>()) {
146         const double val = value.getValue<double>();
147         protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, val);
148         VLOG("\t\t value %d: %.2f", aggIndex, val);
149     } else if (value.is<HistogramValue>()) {
150         const HistogramValue& val = value.getValue<HistogramValue>();
151         const uint64_t histToken =
152                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_HISTOGRAM);
153         val.toProto(*protoOutput);
154         protoOutput->end(histToken);
155         VLOG("\t\t value %d: %s", aggIndex, val.toString().c_str());
156     } else {
157         VLOG("Wrong value type for ValueMetric output");
158     }
159     protoOutput->end(valueToken);
160 }
161 
onActiveStateChangedInternalLocked(const int64_t eventTimeNs,const bool isActive)162 void NumericValueMetricProducer::onActiveStateChangedInternalLocked(const int64_t eventTimeNs,
163                                                                     const bool isActive) {
164     // When active state changes from true to false for pulled metric, clear diff base but don't
165     // reset other counters as we may accumulate more value in the bucket.
166     if (mUseDiff && !isActive) {
167         resetBase();
168     }
169 }
170 
171 // Only called when mIsActive and the event is NOT too late.
onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)172 void NumericValueMetricProducer::onConditionChangedInternalLocked(const ConditionState oldCondition,
173                                                                   const ConditionState newCondition,
174                                                                   const int64_t eventTimeNs) {
175     // For metrics that use diff, when condition changes from true to false,
176     // clear diff base but don't reset other counts because we may accumulate
177     // more value in the bucket.
178     if (mUseDiff &&
179         (oldCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
180         resetBase();
181     }
182 }
183 
prepareFirstBucketLocked()184 void NumericValueMetricProducer::prepareFirstBucketLocked() {
185     // Kicks off the puller immediately if condition is true and diff based.
186     if (mIsActive && isPulled() && mCondition == ConditionState::kTrue && mUseDiff) {
187         pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
188     }
189 }
190 
pullAndMatchEventsLocked(const int64_t timestampNs)191 void NumericValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
192     vector<shared_ptr<LogEvent>> allData;
193     if (!mPullerManager->Pull(mPullAtomId, mConfigKey, timestampNs, &allData)) {
194         ALOGE("Stats puller failed for tag: %d at %lld", mPullAtomId, (long long)timestampNs);
195         invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
196         return;
197     }
198 
199     accumulateEvents(allData, timestampNs, timestampNs);
200 }
201 
calcPreviousBucketEndTime(const int64_t currentTimeNs)202 int64_t NumericValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
203     return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
204 }
205 
206 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
207 // to be delayed. Other events like condition changes or app upgrade which are not based on
208 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,PullResult pullResult,int64_t originalPullTimeNs)209 void NumericValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
210                                               PullResult pullResult, int64_t originalPullTimeNs) {
211     lock_guard<mutex> lock(mMutex);
212     if (mCondition == ConditionState::kTrue) {
213         // If the pull failed, we won't be able to compute a diff.
214         if (pullResult == PullResult::PULL_RESULT_FAIL) {
215             invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
216         } else if (pullResult == PullResult::PULL_RESULT_SUCCESS) {
217             bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
218             if (isEventLate) {
219                 // If the event is late, we are in the middle of a bucket. Just
220                 // process the data without trying to snap the data to the nearest bucket.
221                 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
222             } else {
223                 // For scheduled pulled data, the effective event time is snap to the nearest
224                 // bucket end. In the case of waking up from a deep sleep state, we will
225                 // attribute to the previous bucket end. If the sleep was long but not very
226                 // long, we will be in the immediate next bucket. Previous bucket may get a
227                 // larger number as we pull at a later time than real bucket end.
228                 //
229                 // If the sleep was very long, we skip more than one bucket before sleep. In
230                 // this case, if the diff base will be cleared and this new data will serve as
231                 // new diff base.
232                 int64_t bucketEndTimeNs = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
233                 StatsdStats::getInstance().noteBucketBoundaryDelayNs(
234                         mMetricId, originalPullTimeNs - bucketEndTimeNs);
235                 accumulateEvents(allData, originalPullTimeNs, bucketEndTimeNs);
236             }
237         }
238     }
239 
240     // We can probably flush the bucket. Since we used bucketEndTimeNs when calling
241     // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
242     flushIfNeededLocked(originalPullTimeNs);
243 }
244 
combineValueFields(pair<LogEvent,vector<int>> & eventValues,const LogEvent & newEvent,const vector<int> & newValueIndices) const245 void NumericValueMetricProducer::combineValueFields(pair<LogEvent, vector<int>>& eventValues,
246                                                     const LogEvent& newEvent,
247                                                     const vector<int>& newValueIndices) const {
248     if (eventValues.second.size() != newValueIndices.size()) {
249         ALOGE("NumericValueMetricProducer value indices sizes don't match");
250         return;
251     }
252     vector<FieldValue>* const aggregateFieldValues = eventValues.first.getMutableValues();
253     const vector<FieldValue>& newFieldValues = newEvent.getValues();
254     for (size_t i = 0; i < eventValues.second.size(); ++i) {
255         if (newValueIndices[i] != -1 && eventValues.second[i] != -1) {
256             (*aggregateFieldValues)[eventValues.second[i]].mValue +=
257                     newFieldValues[newValueIndices[i]].mValue;
258         }
259     }
260 }
261 
262 // Process events retrieved from a pull.
accumulateEvents(const vector<shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs)263 void NumericValueMetricProducer::accumulateEvents(const vector<shared_ptr<LogEvent>>& allData,
264                                                   int64_t originalPullTimeNs,
265                                                   int64_t eventElapsedTimeNs) {
266     if (isEventLateLocked(eventElapsedTimeNs)) {
267         VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
268              (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
269         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
270         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
271         return;
272     }
273 
274     const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
275     const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
276     StatsdStats::getInstance().notePullDelay(mPullAtomId, pullDelayNs);
277     if (pullDelayNs > mMaxPullDelayNs) {
278         ALOGE("Pull finish too late for atom %d, longer than %lld", mPullAtomId,
279               (long long)mMaxPullDelayNs);
280         StatsdStats::getInstance().notePullExceedMaxDelay(mPullAtomId);
281         // We are missing one pull from the bucket which means we will not have a complete view of
282         // what's going on.
283         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
284         return;
285     }
286 
287     mMatchedMetricDimensionKeys.clear();
288     if (mUseDiff) {
289         // An extra aggregation step is needed to sum values with matching dimensions
290         // before calculating the diff between sums of consecutive pulls.
291         std::unordered_map<HashableDimensionKey, pair<LogEvent, vector<int>>> aggregateEvents;
292         for (const auto& data : allData) {
293             const auto [matchResult, transformedEvent] =
294                     mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
295             if (matchResult != MatchingState::kMatched) {
296                 continue;
297             }
298 
299             // Get dimensions_in_what key and value indices.
300             HashableDimensionKey dimensionsInWhat;
301             vector<int> valueIndices(mDedupedFieldMatchers.size(), -1);
302             const LogEvent& eventRef = transformedEvent == nullptr ? *data : *transformedEvent;
303             if (!filterValues(mDimensionsInWhat, mDedupedFieldMatchers, eventRef.getValues(),
304                               dimensionsInWhat, valueIndices)) {
305                 StatsdStats::getInstance().noteBadValueType(mMetricId);
306             }
307 
308             // Store new event in map or combine values in existing event.
309             auto it = aggregateEvents.find(dimensionsInWhat);
310             if (it == aggregateEvents.end()) {
311                 aggregateEvents.emplace(std::piecewise_construct,
312                                         std::forward_as_tuple(dimensionsInWhat),
313                                         std::forward_as_tuple(eventRef, valueIndices));
314             } else {
315                 combineValueFields(it->second, eventRef, valueIndices);
316             }
317         }
318 
319         for (auto& [dimKey, eventInfo] : aggregateEvents) {
320             eventInfo.first.setElapsedTimestampNs(eventElapsedTimeNs);
321             onMatchedLogEventLocked(mWhatMatcherIndex, eventInfo.first);
322         }
323     } else {
324         for (const auto& data : allData) {
325             const auto [matchResult, transformedEvent] =
326                     mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
327             if (matchResult == MatchingState::kMatched) {
328                 LogEvent localCopy = transformedEvent == nullptr ? *data : *transformedEvent;
329                 localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
330                 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
331             }
332         }
333     }
334 
335     // If a key that is:
336     // 1. Tracked in mCurrentSlicedBucket and
337     // 2. A superset of the current mStateChangePrimaryKey
338     // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
339     // then we clear the data from mDimInfos to reset the base and current state key.
340     for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
341         const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat();
342         bool presentInPulledData =
343                 mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
344         if (!presentInPulledData &&
345             containsLinkedStateValues(whatKey, mStateChangePrimaryKey.second, mMetric2StateLinks,
346                                       mStateChangePrimaryKey.first)) {
347             auto it = mDimInfos.find(whatKey);
348             if (it != mDimInfos.end()) {
349                 mDimInfos.erase(it);
350             }
351             // Turn OFF condition timer for keys not present in pulled data.
352             currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs);
353         }
354     }
355     mMatchedMetricDimensionKeys.clear();
356     mHasGlobalBase = true;
357 
358     // If we reach the guardrail, we might have dropped some data which means the bucket is
359     // incomplete.
360     //
361     // The base also needs to be reset. If we do not have the full data, we might
362     // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
363     // might be missing from mCurrentSlicedBucket.
364     if (hasReachedGuardRailLimit()) {
365         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
366         mCurrentSlicedBucket.clear();
367     }
368 }
369 
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey)370 bool NumericValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
371     // ===========GuardRail==============
372     // 1. Report the tuple count if the tuple count > soft limit
373     if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
374         return false;
375     }
376     if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
377         size_t newTupleCount = mCurrentFullBucket.size() + 1;
378         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
379         if (newTupleCount > mDimensionHardLimit) {
380             if (!mHasHitGuardrail) {
381                 ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
382                       (long long)mMetricId, newKey.toString().c_str());
383                 mHasHitGuardrail = true;
384             }
385             return true;
386         }
387     }
388 
389     return false;
390 }
391 
392 namespace {
getAggregationInputValue(const LogEvent & event,const Matcher & matcher)393 NumericValue getAggregationInputValue(const LogEvent& event, const Matcher& matcher) {
394     if (matcher.hasAllPositionMatcher()) {  // client-aggregated histogram
395         vector<int> binCounts;
396         for (const FieldValue& value : event.getValues()) {
397             if (!value.mField.matches(matcher)) {
398                 continue;
399             }
400             if (value.mValue.getType() == INT) {
401                 binCounts.push_back(value.mValue.int_value);
402             } else {
403                 return NumericValue{};
404             }
405         }
406         return NumericValue(HistogramValue(binCounts));
407     }
408 
409     for (const FieldValue& value : event.getValues()) {
410         if (!value.mField.matches(matcher)) {
411             continue;
412         }
413         switch (value.mValue.type) {
414             case INT:
415                 return NumericValue((int64_t)value.mValue.int_value);
416             case LONG:
417                 return NumericValue((int64_t)value.mValue.long_value);
418             case FLOAT:
419                 return NumericValue((double)value.mValue.float_value);
420             case DOUBLE:
421                 return NumericValue((double)value.mValue.double_value);
422             default:
423                 return NumericValue{};
424         }
425     }
426     return NumericValue{};
427 }
428 
addValueToHistogram(const NumericValue & value,const optional<const BinStarts> & binStarts,HistogramValue & histValue)429 void addValueToHistogram(const NumericValue& value, const optional<const BinStarts>& binStarts,
430                          HistogramValue& histValue) {
431     if (binStarts == nullopt) {
432         ALOGE("Missing bin configuration!");
433         return;
434     }
435     histValue.addValue(static_cast<float>(toDouble(value)), *binStarts);
436 }
437 
438 }  // anonymous namespace
439 
aggregateFields(const int64_t eventTimeNs,const MetricDimensionKey & eventKey,const LogEvent & event,vector<Interval> & intervals,Bases & bases)440 bool NumericValueMetricProducer::aggregateFields(const int64_t eventTimeNs,
441                                                  const MetricDimensionKey& eventKey,
442                                                  const LogEvent& event, vector<Interval>& intervals,
443                                                  Bases& bases) {
444     if (bases.size() < mFieldMatchers.size()) {
445         VLOG("Resizing number of bases to %zu", mFieldMatchers.size());
446         bases.resize(mFieldMatchers.size());
447     }
448 
449     // We only use anomaly detection under certain cases.
450     // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
451     // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
452     // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
453     // Whoever next works on it should look into the cases where it is triggered in this function.
454     // Discussion here: http://ag/6124370.
455     bool useAnomalyDetection = true;
456     bool seenNewData = false;
457     for (size_t i = 0; i < mFieldMatchers.size(); i++) {
458         const Matcher& matcher = mFieldMatchers[i];
459         Interval& interval = intervals[i];
460         interval.aggIndex = i;
461         NumericValue& base = bases[i];
462         NumericValue value = getAggregationInputValue(event, matcher);
463         if (!value.hasValue()) {
464             VLOG("Failed to get value %zu from event %s", i, event.ToString().c_str());
465             StatsdStats::getInstance().noteBadValueType(mMetricId);
466             return seenNewData;
467         }
468 
469         if (value.is<HistogramValue>() && !value.getValue<HistogramValue>().isValid()) {
470             ALOGE("Invalid histogram at %zu from event %s", i, event.ToString().c_str());
471             StatsdStats::getInstance().noteBadValueType(mMetricId);
472             if (mUseDiff) {
473                 base.reset();
474             }
475             continue;
476         }
477 
478         if (mUseDiff) {
479             if (!base.hasValue()) {
480                 if (mHasGlobalBase && mUseZeroDefaultBase) {
481                     // The bucket has global base. This key does not.
482                     // Optionally use zero as base.
483                     if (value.is<int64_t>()) {
484                         base = ZERO_LONG;
485                     } else if (value.is<double>()) {
486                         base = ZERO_DOUBLE;
487                     } else if (value.is<HistogramValue>()) {
488                         base = HistogramValue();
489                     }
490                 } else {
491                     // no base. just update base and return.
492                     base = value;
493 
494                     // If we're missing a base, do not use anomaly detection on incomplete data
495                     useAnomalyDetection = false;
496 
497                     seenNewData = true;
498                     // Continue (instead of return) here in order to set base value for other bases
499                     continue;
500                 }
501             }
502             NumericValue diff{};
503             if (value.is<HistogramValue>()) {
504                 diff = value - base;
505                 seenNewData = true;
506                 base = value;
507                 if (diff == HistogramValue::ERROR_BINS_MISMATCH) {
508                     ALOGE("Value %zu from event %s does not have enough bins", i,
509                           event.ToString().c_str());
510                     StatsdStats::getInstance().noteBadValueType(mMetricId);
511                     continue;
512                 }
513                 if (diff == HistogramValue::ERROR_BIN_COUNT_TOO_HIGH) {
514                     ALOGE("Value %zu from event %s has decreasing bin count", i,
515                           event.ToString().c_str());
516                     StatsdStats::getInstance().noteBadValueType(mMetricId);
517                     continue;
518                 }
519             } else {
520                 seenNewData = true;
521                 switch (mValueDirection) {
522                     case ValueMetric::INCREASING:
523                         if (value >= base) {
524                             diff = value - base;
525                         } else if (mUseAbsoluteValueOnReset) {
526                             diff = value;
527                         } else {
528                             VLOG("Unexpected decreasing value");
529                             StatsdStats::getInstance().notePullDataError(mPullAtomId);
530                             base = value;
531                             // If we've got bad data, do not use anomaly detection
532                             useAnomalyDetection = false;
533                             continue;
534                         }
535                         break;
536                     case ValueMetric::DECREASING:
537                         if (base >= value) {
538                             diff = base - value;
539                         } else if (mUseAbsoluteValueOnReset) {
540                             diff = value;
541                         } else {
542                             VLOG("Unexpected increasing value");
543                             StatsdStats::getInstance().notePullDataError(mPullAtomId);
544                             base = value;
545                             // If we've got bad data, do not use anomaly detection
546                             useAnomalyDetection = false;
547                             continue;
548                         }
549                         break;
550                     case ValueMetric::ANY:
551                         diff = value - base;
552                         break;
553                     default:
554                         break;
555                 }
556                 base = value;
557             }
558             value = diff;
559         }
560 
561         const ValueMetric::AggregationType aggType = getAggregationTypeLocked(i);
562         if (interval.hasValue()) {
563             switch (aggType) {
564                 case ValueMetric::SUM:
565                     // for AVG, we add up and take average when flushing the bucket
566                 case ValueMetric::AVG:
567                     interval.aggregate += value;
568                     break;
569                 case ValueMetric::MIN:
570                     interval.aggregate = min(value, interval.aggregate);
571                     break;
572                 case ValueMetric::MAX:
573                     interval.aggregate = max(value, interval.aggregate);
574                     break;
575                 case ValueMetric::HISTOGRAM:
576                     if (value.is<HistogramValue>()) {
577                         // client-aggregated histogram: add the corresponding bin counts.
578                         NumericValue sum = interval.aggregate + value;
579                         if (sum == HistogramValue::ERROR_BINS_MISMATCH) {
580                             ALOGE("Value %zu from event %s has too many bins", i,
581                                   event.ToString().c_str());
582                             StatsdStats::getInstance().noteBadValueType(mMetricId);
583                             continue;
584                         }
585                         interval.aggregate = sum;
586                     } else {
587                         // statsd-aggregated histogram: add the raw value to histogram.
588                         addValueToHistogram(value, getBinStarts(i),
589                                             interval.aggregate.getValue<HistogramValue>());
590                     }
591                     break;
592                 default:
593                     break;
594             }
595         } else if (aggType == ValueMetric::HISTOGRAM && !value.is<HistogramValue>()) {
596             // statsd-aggregated histogram: add raw value to histogram.
597             interval.aggregate = HistogramValue();
598             addValueToHistogram(value, getBinStarts(i),
599                                 interval.aggregate.getValue<HistogramValue>());
600         } else {
601             interval.aggregate = value;
602         }
603         seenNewData = true;
604         interval.sampleSize += 1;
605     }
606 
607     // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due
608     // to MULTIPLE_BUCKETS_SKIPPED.
609     if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) {
610         // TODO: propgate proper values down stream when anomaly support doubles
611         long wholeBucketVal = intervals[0].aggregate.getValueOrDefault<int64_t>(0);
612         auto prev = mCurrentFullBucket.find(eventKey);
613         if (prev != mCurrentFullBucket.end()) {
614             wholeBucketVal += prev->second;
615         }
616         for (auto& tracker : mAnomalyTrackers) {
617             tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
618                                              wholeBucketVal);
619         }
620     }
621     return seenNewData;
622 }
623 
buildPartialBucket(int64_t bucketEndTimeNs,vector<Interval> & intervals)624 PastBucket<NumericValue> NumericValueMetricProducer::buildPartialBucket(
625         int64_t bucketEndTimeNs, vector<Interval>& intervals) {
626     PastBucket<NumericValue> bucket;
627     bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
628     bucket.mBucketEndNs = bucketEndTimeNs;
629 
630     // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold,
631     // then all interval values are discarded for this bucket.
632     if (intervals.empty() || (intervals[0].hasValue() && !valuePassesThreshold(intervals[0]))) {
633         return bucket;
634     }
635 
636     for (const Interval& interval : intervals) {
637         // skip the output if the diff is zero
638         if (!interval.hasValue() ||
639             (mSkipZeroDiffOutput && mUseDiff && interval.aggregate.isZero())) {
640             continue;
641         }
642 
643         bucket.aggIndex.push_back(interval.aggIndex);
644         bucket.aggregates.push_back(getFinalValue(interval));
645         if (mIncludeSampleSize) {
646             bucket.sampleSizes.push_back(interval.sampleSize);
647         }
648     }
649     return bucket;
650 }
651 
652 // Also invalidates current bucket if multiple buckets have been skipped
closeCurrentBucket(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)653 void NumericValueMetricProducer::closeCurrentBucket(const int64_t eventTimeNs,
654                                                     const int64_t nextBucketStartTimeNs) {
655     ValueMetricProducer::closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
656     if (mAnomalyTrackers.size() > 0) {
657         appendToFullBucket(eventTimeNs > getCurrentBucketEndTimeNs());
658     }
659 }
660 
initNextSlicedBucket(int64_t nextBucketStartTimeNs)661 void NumericValueMetricProducer::initNextSlicedBucket(int64_t nextBucketStartTimeNs) {
662     ValueMetricProducer::initNextSlicedBucket(nextBucketStartTimeNs);
663 
664     // If we do not have a global base when the condition is true,
665     // we will have incomplete bucket for the next bucket.
666     if (mUseDiff && !mHasGlobalBase && mCondition) {
667         // TODO(b/188878815): mCurrentBucketIsSkipped should probably be set to true here.
668         mCurrentBucketIsSkipped = false;
669     }
670 }
671 
appendToFullBucket(const bool isFullBucketReached)672 void NumericValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
673     if (mCurrentBucketIsSkipped) {
674         if (isFullBucketReached) {
675             // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
676             mCurrentFullBucket.clear();
677         }
678         // Current bucket is invalid, we do not add it to the full bucket.
679         return;
680     }
681 
682     if (isFullBucketReached) {  // If full bucket, send to anomaly tracker.
683         // Accumulate partial buckets with current value and then send to anomaly tracker.
684         if (mCurrentFullBucket.size() > 0) {
685             for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
686                 if (hitFullBucketGuardRailLocked(metricDimensionKey) ||
687                     currentBucket.intervals.empty()) {
688                     continue;
689                 }
690                 // TODO: fix this when anomaly can accept double values
691                 auto& interval = currentBucket.intervals[0];
692                 if (interval.hasValue()) {
693                     mCurrentFullBucket[metricDimensionKey] +=
694                             interval.aggregate.getValueOrDefault<int64_t>(0);
695                 }
696             }
697             for (const auto& [metricDimensionKey, value] : mCurrentFullBucket) {
698                 for (auto& tracker : mAnomalyTrackers) {
699                     if (tracker != nullptr) {
700                         tracker->addPastBucket(metricDimensionKey, value, mCurrentBucketNum);
701                     }
702                 }
703             }
704             mCurrentFullBucket.clear();
705         } else {
706             // Skip aggregating the partial buckets since there's no previous partial bucket.
707             for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
708                 for (auto& tracker : mAnomalyTrackers) {
709                     if (tracker != nullptr && !currentBucket.intervals.empty()) {
710                         // TODO: fix this when anomaly can accept double values
711                         auto& interval = currentBucket.intervals[0];
712                         if (interval.hasValue()) {
713                             const int64_t longVal =
714                                     interval.aggregate.getValueOrDefault<int64_t>(0);
715                             tracker->addPastBucket(metricDimensionKey, longVal, mCurrentBucketNum);
716                         }
717                     }
718                 }
719             }
720         }
721     } else {
722         // Accumulate partial bucket.
723         for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
724             if (!currentBucket.intervals.empty()) {
725                 // TODO: fix this when anomaly can accept double values
726                 auto& interval = currentBucket.intervals[0];
727                 if (interval.hasValue()) {
728                     mCurrentFullBucket[metricDimensionKey] +=
729                             interval.aggregate.getValueOrDefault<int64_t>(0);
730                 }
731             }
732         }
733     }
734 }
735 
getBinStarts(int valueFieldIndex) const736 const optional<const BinStarts>& NumericValueMetricProducer::getBinStarts(
737         int valueFieldIndex) const {
738     return mBinStartsList.size() == 1 ? mBinStartsList[0] : mBinStartsList[valueFieldIndex];
739 }
740 
741 // Estimate for the size of NumericValues.
getAggregatedValueSize(const NumericValue & value) const742 size_t NumericValueMetricProducer::getAggregatedValueSize(const NumericValue& value) const {
743     size_t valueSize = 0;
744     // Index
745     valueSize += sizeof(int32_t);
746 
747     // Value
748     valueSize += value.getSize();
749 
750     // Sample Size
751     if (mIncludeSampleSize) {
752         valueSize += sizeof(int32_t);
753     }
754     return valueSize;
755 }
756 
byteSizeLocked() const757 size_t NumericValueMetricProducer::byteSizeLocked() const {
758     sp<ConfigMetadataProvider> configMetadataProvider = getConfigMetadataProvider();
759     if (configMetadataProvider != nullptr && configMetadataProvider->useV2SoftMemoryCalculation()) {
760         bool dimensionGuardrailHit = StatsdStats::getInstance().hasHitDimensionGuardrail(mMetricId);
761         return computeOverheadSizeLocked(!mPastBuckets.empty() || !mSkippedBuckets.empty(),
762                                          dimensionGuardrailHit) +
763                mTotalDataSize;
764     }
765     size_t totalSize = 0;
766     for (const auto& [_, buckets] : mPastBuckets) {
767         totalSize += buckets.size() * kBucketSize;
768         // TODO(b/189283526): Add bytes used to store PastBucket.aggIndex vector
769     }
770     return totalSize;
771 }
772 
valuePassesThreshold(const Interval & interval) const773 bool NumericValueMetricProducer::valuePassesThreshold(const Interval& interval) const {
774     if (mUploadThreshold == nullopt) {
775         return true;
776     }
777 
778     double doubleValue = toDouble(getFinalValue(interval));
779 
780     switch (mUploadThreshold->value_comparison_case()) {
781         case UploadThreshold::kLtInt:
782             return doubleValue < (double)mUploadThreshold->lt_int();
783         case UploadThreshold::kGtInt:
784             return doubleValue > (double)mUploadThreshold->gt_int();
785         case UploadThreshold::kLteInt:
786             return doubleValue <= (double)mUploadThreshold->lte_int();
787         case UploadThreshold::kGteInt:
788             return doubleValue >= (double)mUploadThreshold->gte_int();
789         case UploadThreshold::kLtFloat:
790             return doubleValue <= (double)mUploadThreshold->lt_float();
791         case UploadThreshold::kGtFloat:
792             return doubleValue >= (double)mUploadThreshold->gt_float();
793         default:
794             ALOGE("Value metric no upload threshold type used");
795             return false;
796     }
797 }
798 
getFinalValue(const Interval & interval) const799 NumericValue NumericValueMetricProducer::getFinalValue(const Interval& interval) const {
800     if (interval.aggregate.is<HistogramValue>()) {
801         return interval.aggregate.getValue<HistogramValue>().getCompactedHistogramValue();
802     }
803     if (getAggregationTypeLocked(interval.aggIndex) != ValueMetric::AVG) {
804         return interval.aggregate;
805     } else {
806         double sum = toDouble(interval.aggregate);
807         return NumericValue(sum / interval.sampleSize);
808     }
809 }
810 
getDumpProtoFields() const811 NumericValueMetricProducer::DumpProtoFields NumericValueMetricProducer::getDumpProtoFields() const {
812     return {FIELD_ID_VALUE_METRICS,
813             FIELD_ID_BUCKET_NUM,
814             FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
815             FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
816             FIELD_ID_CONDITION_TRUE_NS,
817             FIELD_ID_CONDITION_CORRECTION_NS};
818 }
819 
determineCorruptionSeverity(int32_t atomId,DataCorruptedReason,LostAtomType atomType) const820 MetricProducer::DataCorruptionSeverity NumericValueMetricProducer::determineCorruptionSeverity(
821         int32_t atomId, DataCorruptedReason /*reason*/, LostAtomType atomType) const {
822     switch (atomType) {
823         case LostAtomType::kWhat:
824             return mUseDiff ? DataCorruptionSeverity::kUnrecoverable
825                             : DataCorruptionSeverity::kResetOnDump;
826         case LostAtomType::kCondition:
827         case LostAtomType::kState:
828             return DataCorruptionSeverity::kUnrecoverable;
829     };
830     return DataCorruptionSeverity::kNone;
831 };
832 
833 }  // namespace statsd
834 }  // namespace os
835 }  // namespace android
836