1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define STATSD_DEBUG false // STOPSHIP if true
18 #include "Log.h"
19
20 #include "NumericValueMetricProducer.h"
21
22 #include <stdlib.h>
23
24 #include <algorithm>
25
26 #include "FieldValue.h"
27 #include "guardrail/StatsdStats.h"
28 #include "metrics/HistogramValue.h"
29 #include "metrics/NumericValue.h"
30 #include "metrics/parsing_utils/metrics_manager_util.h"
31 #include "stats_log_util.h"
32
33 using android::util::FIELD_COUNT_REPEATED;
34 using android::util::FIELD_TYPE_BOOL;
35 using android::util::FIELD_TYPE_DOUBLE;
36 using android::util::FIELD_TYPE_INT32;
37 using android::util::FIELD_TYPE_INT64;
38 using android::util::FIELD_TYPE_MESSAGE;
39 using android::util::FIELD_TYPE_STRING;
40 using android::util::ProtoOutputStream;
41 using std::shared_ptr;
42 using std::string;
43 using std::unordered_map;
44
45 namespace android {
46 namespace os {
47 namespace statsd {
48
49 namespace { // anonymous namespace
50 // for StatsLogReport
51 const int FIELD_ID_VALUE_METRICS = 7;
52 // for ValueBucketInfo
53 const int FIELD_ID_VALUE_INDEX = 1;
54 const int FIELD_ID_VALUE_LONG = 2;
55 const int FIELD_ID_VALUE_DOUBLE = 3;
56 const int FIELD_ID_VALUE_HISTOGRAM = 5;
57 const int FIELD_ID_VALUE_SAMPLESIZE = 4;
58 const int FIELD_ID_VALUES = 9;
59 const int FIELD_ID_BUCKET_NUM = 4;
60 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
61 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
62 const int FIELD_ID_CONDITION_TRUE_NS = 10;
63 const int FIELD_ID_CONDITION_CORRECTION_NS = 11;
64
65 const NumericValue ZERO_LONG((int64_t)0);
66 const NumericValue ZERO_DOUBLE((double)0);
67
toDouble(const NumericValue & value)68 double toDouble(const NumericValue& value) {
69 return value.is<int64_t>() ? value.getValue<int64_t>() : value.getValueOrDefault<double>(0);
70 }
71
72 } // anonymous namespace
73
74 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
NumericValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions,const wp<ConfigMetadataProvider> configMetadataProvider)75 NumericValueMetricProducer::NumericValueMetricProducer(
76 const ConfigKey& key, const ValueMetric& metric, const uint64_t protoHash,
77 const PullOptions& pullOptions, const BucketOptions& bucketOptions,
78 const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
79 const StateOptions& stateOptions, const ActivationOptions& activationOptions,
80 const GuardrailOptions& guardrailOptions,
81 const wp<ConfigMetadataProvider> configMetadataProvider)
82 : ValueMetricProducer(metric.id(), key, protoHash, pullOptions, bucketOptions, whatOptions,
83 conditionOptions, stateOptions, activationOptions, guardrailOptions,
84 configMetadataProvider),
85 mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
86 mAggregationTypes(whatOptions.aggregationTypes),
87 mIncludeSampleSize(metric.has_include_sample_size()
88 ? metric.include_sample_size()
89 : hasAvgAggregationType(whatOptions.aggregationTypes)),
90 mUseDiff(metric.has_use_diff() ? metric.use_diff() : isPulled()),
91 mValueDirection(metric.value_direction()),
92 mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
93 mUseZeroDefaultBase(metric.use_zero_default_base()),
94 mHasGlobalBase(false),
95 mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC
96 : StatsdStats::kPullMaxDelayNs),
97 mDedupedFieldMatchers(dedupFieldMatchers(whatOptions.fieldMatchers)),
98 mBinStartsList(whatOptions.binStartsList) {
99 // TODO(b/186677791): Use initializer list to initialize mUploadThreshold.
100 if (metric.has_threshold()) {
101 mUploadThreshold = metric.threshold();
102 }
103 }
104
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)105 void NumericValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
106 const BucketDropReason reason) {
107 ValueMetricProducer::invalidateCurrentBucket(dropTimeNs, reason);
108
109 switch (reason) {
110 case BucketDropReason::DUMP_REPORT_REQUESTED:
111 case BucketDropReason::EVENT_IN_WRONG_BUCKET:
112 case BucketDropReason::CONDITION_UNKNOWN:
113 case BucketDropReason::PULL_FAILED:
114 case BucketDropReason::PULL_DELAYED:
115 case BucketDropReason::DIMENSION_GUARDRAIL_REACHED:
116 resetBase();
117 break;
118 default:
119 break;
120 }
121 }
122
resetBase()123 void NumericValueMetricProducer::resetBase() {
124 for (auto& [_, dimInfo] : mDimInfos) {
125 for (NumericValue& base : dimInfo.dimExtras) {
126 base.reset();
127 }
128 }
129 mHasGlobalBase = false;
130 }
131
writePastBucketAggregateToProto(const int aggIndex,const NumericValue & value,const int sampleSize,ProtoOutputStream * const protoOutput) const132 void NumericValueMetricProducer::writePastBucketAggregateToProto(
133 const int aggIndex, const NumericValue& value, const int sampleSize,
134 ProtoOutputStream* const protoOutput) const {
135 uint64_t valueToken =
136 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
137 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, aggIndex);
138 if (mIncludeSampleSize) {
139 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_SAMPLESIZE, sampleSize);
140 }
141 if (value.is<int64_t>()) {
142 const int64_t val = value.getValue<int64_t>();
143 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)val);
144 VLOG("\t\t value %d: %lld", aggIndex, (long long)val);
145 } else if (value.is<double>()) {
146 const double val = value.getValue<double>();
147 protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, val);
148 VLOG("\t\t value %d: %.2f", aggIndex, val);
149 } else if (value.is<HistogramValue>()) {
150 const HistogramValue& val = value.getValue<HistogramValue>();
151 const uint64_t histToken =
152 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_HISTOGRAM);
153 val.toProto(*protoOutput);
154 protoOutput->end(histToken);
155 VLOG("\t\t value %d: %s", aggIndex, val.toString().c_str());
156 } else {
157 VLOG("Wrong value type for ValueMetric output");
158 }
159 protoOutput->end(valueToken);
160 }
161
onActiveStateChangedInternalLocked(const int64_t eventTimeNs,const bool isActive)162 void NumericValueMetricProducer::onActiveStateChangedInternalLocked(const int64_t eventTimeNs,
163 const bool isActive) {
164 // When active state changes from true to false for pulled metric, clear diff base but don't
165 // reset other counters as we may accumulate more value in the bucket.
166 if (mUseDiff && !isActive) {
167 resetBase();
168 }
169 }
170
171 // Only called when mIsActive and the event is NOT too late.
onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)172 void NumericValueMetricProducer::onConditionChangedInternalLocked(const ConditionState oldCondition,
173 const ConditionState newCondition,
174 const int64_t eventTimeNs) {
175 // For metrics that use diff, when condition changes from true to false,
176 // clear diff base but don't reset other counts because we may accumulate
177 // more value in the bucket.
178 if (mUseDiff &&
179 (oldCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
180 resetBase();
181 }
182 }
183
prepareFirstBucketLocked()184 void NumericValueMetricProducer::prepareFirstBucketLocked() {
185 // Kicks off the puller immediately if condition is true and diff based.
186 if (mIsActive && isPulled() && mCondition == ConditionState::kTrue && mUseDiff) {
187 pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
188 }
189 }
190
pullAndMatchEventsLocked(const int64_t timestampNs)191 void NumericValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
192 vector<shared_ptr<LogEvent>> allData;
193 if (!mPullerManager->Pull(mPullAtomId, mConfigKey, timestampNs, &allData)) {
194 ALOGE("Stats puller failed for tag: %d at %lld", mPullAtomId, (long long)timestampNs);
195 invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
196 return;
197 }
198
199 accumulateEvents(allData, timestampNs, timestampNs);
200 }
201
calcPreviousBucketEndTime(const int64_t currentTimeNs)202 int64_t NumericValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
203 return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
204 }
205
206 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
207 // to be delayed. Other events like condition changes or app upgrade which are not based on
208 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,PullResult pullResult,int64_t originalPullTimeNs)209 void NumericValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
210 PullResult pullResult, int64_t originalPullTimeNs) {
211 lock_guard<mutex> lock(mMutex);
212 if (mCondition == ConditionState::kTrue) {
213 // If the pull failed, we won't be able to compute a diff.
214 if (pullResult == PullResult::PULL_RESULT_FAIL) {
215 invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
216 } else if (pullResult == PullResult::PULL_RESULT_SUCCESS) {
217 bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
218 if (isEventLate) {
219 // If the event is late, we are in the middle of a bucket. Just
220 // process the data without trying to snap the data to the nearest bucket.
221 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
222 } else {
223 // For scheduled pulled data, the effective event time is snap to the nearest
224 // bucket end. In the case of waking up from a deep sleep state, we will
225 // attribute to the previous bucket end. If the sleep was long but not very
226 // long, we will be in the immediate next bucket. Previous bucket may get a
227 // larger number as we pull at a later time than real bucket end.
228 //
229 // If the sleep was very long, we skip more than one bucket before sleep. In
230 // this case, if the diff base will be cleared and this new data will serve as
231 // new diff base.
232 int64_t bucketEndTimeNs = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
233 StatsdStats::getInstance().noteBucketBoundaryDelayNs(
234 mMetricId, originalPullTimeNs - bucketEndTimeNs);
235 accumulateEvents(allData, originalPullTimeNs, bucketEndTimeNs);
236 }
237 }
238 }
239
240 // We can probably flush the bucket. Since we used bucketEndTimeNs when calling
241 // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
242 flushIfNeededLocked(originalPullTimeNs);
243 }
244
combineValueFields(pair<LogEvent,vector<int>> & eventValues,const LogEvent & newEvent,const vector<int> & newValueIndices) const245 void NumericValueMetricProducer::combineValueFields(pair<LogEvent, vector<int>>& eventValues,
246 const LogEvent& newEvent,
247 const vector<int>& newValueIndices) const {
248 if (eventValues.second.size() != newValueIndices.size()) {
249 ALOGE("NumericValueMetricProducer value indices sizes don't match");
250 return;
251 }
252 vector<FieldValue>* const aggregateFieldValues = eventValues.first.getMutableValues();
253 const vector<FieldValue>& newFieldValues = newEvent.getValues();
254 for (size_t i = 0; i < eventValues.second.size(); ++i) {
255 if (newValueIndices[i] != -1 && eventValues.second[i] != -1) {
256 (*aggregateFieldValues)[eventValues.second[i]].mValue +=
257 newFieldValues[newValueIndices[i]].mValue;
258 }
259 }
260 }
261
262 // Process events retrieved from a pull.
accumulateEvents(const vector<shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs)263 void NumericValueMetricProducer::accumulateEvents(const vector<shared_ptr<LogEvent>>& allData,
264 int64_t originalPullTimeNs,
265 int64_t eventElapsedTimeNs) {
266 if (isEventLateLocked(eventElapsedTimeNs)) {
267 VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
268 (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
269 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
270 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
271 return;
272 }
273
274 const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
275 const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
276 StatsdStats::getInstance().notePullDelay(mPullAtomId, pullDelayNs);
277 if (pullDelayNs > mMaxPullDelayNs) {
278 ALOGE("Pull finish too late for atom %d, longer than %lld", mPullAtomId,
279 (long long)mMaxPullDelayNs);
280 StatsdStats::getInstance().notePullExceedMaxDelay(mPullAtomId);
281 // We are missing one pull from the bucket which means we will not have a complete view of
282 // what's going on.
283 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
284 return;
285 }
286
287 mMatchedMetricDimensionKeys.clear();
288 if (mUseDiff) {
289 // An extra aggregation step is needed to sum values with matching dimensions
290 // before calculating the diff between sums of consecutive pulls.
291 std::unordered_map<HashableDimensionKey, pair<LogEvent, vector<int>>> aggregateEvents;
292 for (const auto& data : allData) {
293 const auto [matchResult, transformedEvent] =
294 mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
295 if (matchResult != MatchingState::kMatched) {
296 continue;
297 }
298
299 // Get dimensions_in_what key and value indices.
300 HashableDimensionKey dimensionsInWhat;
301 vector<int> valueIndices(mDedupedFieldMatchers.size(), -1);
302 const LogEvent& eventRef = transformedEvent == nullptr ? *data : *transformedEvent;
303 if (!filterValues(mDimensionsInWhat, mDedupedFieldMatchers, eventRef.getValues(),
304 dimensionsInWhat, valueIndices)) {
305 StatsdStats::getInstance().noteBadValueType(mMetricId);
306 }
307
308 // Store new event in map or combine values in existing event.
309 auto it = aggregateEvents.find(dimensionsInWhat);
310 if (it == aggregateEvents.end()) {
311 aggregateEvents.emplace(std::piecewise_construct,
312 std::forward_as_tuple(dimensionsInWhat),
313 std::forward_as_tuple(eventRef, valueIndices));
314 } else {
315 combineValueFields(it->second, eventRef, valueIndices);
316 }
317 }
318
319 for (auto& [dimKey, eventInfo] : aggregateEvents) {
320 eventInfo.first.setElapsedTimestampNs(eventElapsedTimeNs);
321 onMatchedLogEventLocked(mWhatMatcherIndex, eventInfo.first);
322 }
323 } else {
324 for (const auto& data : allData) {
325 const auto [matchResult, transformedEvent] =
326 mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
327 if (matchResult == MatchingState::kMatched) {
328 LogEvent localCopy = transformedEvent == nullptr ? *data : *transformedEvent;
329 localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
330 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
331 }
332 }
333 }
334
335 // If a key that is:
336 // 1. Tracked in mCurrentSlicedBucket and
337 // 2. A superset of the current mStateChangePrimaryKey
338 // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
339 // then we clear the data from mDimInfos to reset the base and current state key.
340 for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
341 const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat();
342 bool presentInPulledData =
343 mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
344 if (!presentInPulledData &&
345 containsLinkedStateValues(whatKey, mStateChangePrimaryKey.second, mMetric2StateLinks,
346 mStateChangePrimaryKey.first)) {
347 auto it = mDimInfos.find(whatKey);
348 if (it != mDimInfos.end()) {
349 mDimInfos.erase(it);
350 }
351 // Turn OFF condition timer for keys not present in pulled data.
352 currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs);
353 }
354 }
355 mMatchedMetricDimensionKeys.clear();
356 mHasGlobalBase = true;
357
358 // If we reach the guardrail, we might have dropped some data which means the bucket is
359 // incomplete.
360 //
361 // The base also needs to be reset. If we do not have the full data, we might
362 // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
363 // might be missing from mCurrentSlicedBucket.
364 if (hasReachedGuardRailLimit()) {
365 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
366 mCurrentSlicedBucket.clear();
367 }
368 }
369
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey)370 bool NumericValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
371 // ===========GuardRail==============
372 // 1. Report the tuple count if the tuple count > soft limit
373 if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
374 return false;
375 }
376 if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
377 size_t newTupleCount = mCurrentFullBucket.size() + 1;
378 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
379 if (newTupleCount > mDimensionHardLimit) {
380 if (!mHasHitGuardrail) {
381 ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
382 (long long)mMetricId, newKey.toString().c_str());
383 mHasHitGuardrail = true;
384 }
385 return true;
386 }
387 }
388
389 return false;
390 }
391
392 namespace {
getAggregationInputValue(const LogEvent & event,const Matcher & matcher)393 NumericValue getAggregationInputValue(const LogEvent& event, const Matcher& matcher) {
394 if (matcher.hasAllPositionMatcher()) { // client-aggregated histogram
395 vector<int> binCounts;
396 for (const FieldValue& value : event.getValues()) {
397 if (!value.mField.matches(matcher)) {
398 continue;
399 }
400 if (value.mValue.getType() == INT) {
401 binCounts.push_back(value.mValue.int_value);
402 } else {
403 return NumericValue{};
404 }
405 }
406 return NumericValue(HistogramValue(binCounts));
407 }
408
409 for (const FieldValue& value : event.getValues()) {
410 if (!value.mField.matches(matcher)) {
411 continue;
412 }
413 switch (value.mValue.type) {
414 case INT:
415 return NumericValue((int64_t)value.mValue.int_value);
416 case LONG:
417 return NumericValue((int64_t)value.mValue.long_value);
418 case FLOAT:
419 return NumericValue((double)value.mValue.float_value);
420 case DOUBLE:
421 return NumericValue((double)value.mValue.double_value);
422 default:
423 return NumericValue{};
424 }
425 }
426 return NumericValue{};
427 }
428
addValueToHistogram(const NumericValue & value,const optional<const BinStarts> & binStarts,HistogramValue & histValue)429 void addValueToHistogram(const NumericValue& value, const optional<const BinStarts>& binStarts,
430 HistogramValue& histValue) {
431 if (binStarts == nullopt) {
432 ALOGE("Missing bin configuration!");
433 return;
434 }
435 histValue.addValue(static_cast<float>(toDouble(value)), *binStarts);
436 }
437
438 } // anonymous namespace
439
aggregateFields(const int64_t eventTimeNs,const MetricDimensionKey & eventKey,const LogEvent & event,vector<Interval> & intervals,Bases & bases)440 bool NumericValueMetricProducer::aggregateFields(const int64_t eventTimeNs,
441 const MetricDimensionKey& eventKey,
442 const LogEvent& event, vector<Interval>& intervals,
443 Bases& bases) {
444 if (bases.size() < mFieldMatchers.size()) {
445 VLOG("Resizing number of bases to %zu", mFieldMatchers.size());
446 bases.resize(mFieldMatchers.size());
447 }
448
449 // We only use anomaly detection under certain cases.
450 // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
451 // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
452 // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
453 // Whoever next works on it should look into the cases where it is triggered in this function.
454 // Discussion here: http://ag/6124370.
455 bool useAnomalyDetection = true;
456 bool seenNewData = false;
457 for (size_t i = 0; i < mFieldMatchers.size(); i++) {
458 const Matcher& matcher = mFieldMatchers[i];
459 Interval& interval = intervals[i];
460 interval.aggIndex = i;
461 NumericValue& base = bases[i];
462 NumericValue value = getAggregationInputValue(event, matcher);
463 if (!value.hasValue()) {
464 VLOG("Failed to get value %zu from event %s", i, event.ToString().c_str());
465 StatsdStats::getInstance().noteBadValueType(mMetricId);
466 return seenNewData;
467 }
468
469 if (value.is<HistogramValue>() && !value.getValue<HistogramValue>().isValid()) {
470 ALOGE("Invalid histogram at %zu from event %s", i, event.ToString().c_str());
471 StatsdStats::getInstance().noteBadValueType(mMetricId);
472 if (mUseDiff) {
473 base.reset();
474 }
475 continue;
476 }
477
478 if (mUseDiff) {
479 if (!base.hasValue()) {
480 if (mHasGlobalBase && mUseZeroDefaultBase) {
481 // The bucket has global base. This key does not.
482 // Optionally use zero as base.
483 if (value.is<int64_t>()) {
484 base = ZERO_LONG;
485 } else if (value.is<double>()) {
486 base = ZERO_DOUBLE;
487 } else if (value.is<HistogramValue>()) {
488 base = HistogramValue();
489 }
490 } else {
491 // no base. just update base and return.
492 base = value;
493
494 // If we're missing a base, do not use anomaly detection on incomplete data
495 useAnomalyDetection = false;
496
497 seenNewData = true;
498 // Continue (instead of return) here in order to set base value for other bases
499 continue;
500 }
501 }
502 NumericValue diff{};
503 if (value.is<HistogramValue>()) {
504 diff = value - base;
505 seenNewData = true;
506 base = value;
507 if (diff == HistogramValue::ERROR_BINS_MISMATCH) {
508 ALOGE("Value %zu from event %s does not have enough bins", i,
509 event.ToString().c_str());
510 StatsdStats::getInstance().noteBadValueType(mMetricId);
511 continue;
512 }
513 if (diff == HistogramValue::ERROR_BIN_COUNT_TOO_HIGH) {
514 ALOGE("Value %zu from event %s has decreasing bin count", i,
515 event.ToString().c_str());
516 StatsdStats::getInstance().noteBadValueType(mMetricId);
517 continue;
518 }
519 } else {
520 seenNewData = true;
521 switch (mValueDirection) {
522 case ValueMetric::INCREASING:
523 if (value >= base) {
524 diff = value - base;
525 } else if (mUseAbsoluteValueOnReset) {
526 diff = value;
527 } else {
528 VLOG("Unexpected decreasing value");
529 StatsdStats::getInstance().notePullDataError(mPullAtomId);
530 base = value;
531 // If we've got bad data, do not use anomaly detection
532 useAnomalyDetection = false;
533 continue;
534 }
535 break;
536 case ValueMetric::DECREASING:
537 if (base >= value) {
538 diff = base - value;
539 } else if (mUseAbsoluteValueOnReset) {
540 diff = value;
541 } else {
542 VLOG("Unexpected increasing value");
543 StatsdStats::getInstance().notePullDataError(mPullAtomId);
544 base = value;
545 // If we've got bad data, do not use anomaly detection
546 useAnomalyDetection = false;
547 continue;
548 }
549 break;
550 case ValueMetric::ANY:
551 diff = value - base;
552 break;
553 default:
554 break;
555 }
556 base = value;
557 }
558 value = diff;
559 }
560
561 const ValueMetric::AggregationType aggType = getAggregationTypeLocked(i);
562 if (interval.hasValue()) {
563 switch (aggType) {
564 case ValueMetric::SUM:
565 // for AVG, we add up and take average when flushing the bucket
566 case ValueMetric::AVG:
567 interval.aggregate += value;
568 break;
569 case ValueMetric::MIN:
570 interval.aggregate = min(value, interval.aggregate);
571 break;
572 case ValueMetric::MAX:
573 interval.aggregate = max(value, interval.aggregate);
574 break;
575 case ValueMetric::HISTOGRAM:
576 if (value.is<HistogramValue>()) {
577 // client-aggregated histogram: add the corresponding bin counts.
578 NumericValue sum = interval.aggregate + value;
579 if (sum == HistogramValue::ERROR_BINS_MISMATCH) {
580 ALOGE("Value %zu from event %s has too many bins", i,
581 event.ToString().c_str());
582 StatsdStats::getInstance().noteBadValueType(mMetricId);
583 continue;
584 }
585 interval.aggregate = sum;
586 } else {
587 // statsd-aggregated histogram: add the raw value to histogram.
588 addValueToHistogram(value, getBinStarts(i),
589 interval.aggregate.getValue<HistogramValue>());
590 }
591 break;
592 default:
593 break;
594 }
595 } else if (aggType == ValueMetric::HISTOGRAM && !value.is<HistogramValue>()) {
596 // statsd-aggregated histogram: add raw value to histogram.
597 interval.aggregate = HistogramValue();
598 addValueToHistogram(value, getBinStarts(i),
599 interval.aggregate.getValue<HistogramValue>());
600 } else {
601 interval.aggregate = value;
602 }
603 seenNewData = true;
604 interval.sampleSize += 1;
605 }
606
607 // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due
608 // to MULTIPLE_BUCKETS_SKIPPED.
609 if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) {
610 // TODO: propgate proper values down stream when anomaly support doubles
611 long wholeBucketVal = intervals[0].aggregate.getValueOrDefault<int64_t>(0);
612 auto prev = mCurrentFullBucket.find(eventKey);
613 if (prev != mCurrentFullBucket.end()) {
614 wholeBucketVal += prev->second;
615 }
616 for (auto& tracker : mAnomalyTrackers) {
617 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
618 wholeBucketVal);
619 }
620 }
621 return seenNewData;
622 }
623
buildPartialBucket(int64_t bucketEndTimeNs,vector<Interval> & intervals)624 PastBucket<NumericValue> NumericValueMetricProducer::buildPartialBucket(
625 int64_t bucketEndTimeNs, vector<Interval>& intervals) {
626 PastBucket<NumericValue> bucket;
627 bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
628 bucket.mBucketEndNs = bucketEndTimeNs;
629
630 // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold,
631 // then all interval values are discarded for this bucket.
632 if (intervals.empty() || (intervals[0].hasValue() && !valuePassesThreshold(intervals[0]))) {
633 return bucket;
634 }
635
636 for (const Interval& interval : intervals) {
637 // skip the output if the diff is zero
638 if (!interval.hasValue() ||
639 (mSkipZeroDiffOutput && mUseDiff && interval.aggregate.isZero())) {
640 continue;
641 }
642
643 bucket.aggIndex.push_back(interval.aggIndex);
644 bucket.aggregates.push_back(getFinalValue(interval));
645 if (mIncludeSampleSize) {
646 bucket.sampleSizes.push_back(interval.sampleSize);
647 }
648 }
649 return bucket;
650 }
651
652 // Also invalidates current bucket if multiple buckets have been skipped
closeCurrentBucket(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)653 void NumericValueMetricProducer::closeCurrentBucket(const int64_t eventTimeNs,
654 const int64_t nextBucketStartTimeNs) {
655 ValueMetricProducer::closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
656 if (mAnomalyTrackers.size() > 0) {
657 appendToFullBucket(eventTimeNs > getCurrentBucketEndTimeNs());
658 }
659 }
660
initNextSlicedBucket(int64_t nextBucketStartTimeNs)661 void NumericValueMetricProducer::initNextSlicedBucket(int64_t nextBucketStartTimeNs) {
662 ValueMetricProducer::initNextSlicedBucket(nextBucketStartTimeNs);
663
664 // If we do not have a global base when the condition is true,
665 // we will have incomplete bucket for the next bucket.
666 if (mUseDiff && !mHasGlobalBase && mCondition) {
667 // TODO(b/188878815): mCurrentBucketIsSkipped should probably be set to true here.
668 mCurrentBucketIsSkipped = false;
669 }
670 }
671
appendToFullBucket(const bool isFullBucketReached)672 void NumericValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
673 if (mCurrentBucketIsSkipped) {
674 if (isFullBucketReached) {
675 // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
676 mCurrentFullBucket.clear();
677 }
678 // Current bucket is invalid, we do not add it to the full bucket.
679 return;
680 }
681
682 if (isFullBucketReached) { // If full bucket, send to anomaly tracker.
683 // Accumulate partial buckets with current value and then send to anomaly tracker.
684 if (mCurrentFullBucket.size() > 0) {
685 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
686 if (hitFullBucketGuardRailLocked(metricDimensionKey) ||
687 currentBucket.intervals.empty()) {
688 continue;
689 }
690 // TODO: fix this when anomaly can accept double values
691 auto& interval = currentBucket.intervals[0];
692 if (interval.hasValue()) {
693 mCurrentFullBucket[metricDimensionKey] +=
694 interval.aggregate.getValueOrDefault<int64_t>(0);
695 }
696 }
697 for (const auto& [metricDimensionKey, value] : mCurrentFullBucket) {
698 for (auto& tracker : mAnomalyTrackers) {
699 if (tracker != nullptr) {
700 tracker->addPastBucket(metricDimensionKey, value, mCurrentBucketNum);
701 }
702 }
703 }
704 mCurrentFullBucket.clear();
705 } else {
706 // Skip aggregating the partial buckets since there's no previous partial bucket.
707 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
708 for (auto& tracker : mAnomalyTrackers) {
709 if (tracker != nullptr && !currentBucket.intervals.empty()) {
710 // TODO: fix this when anomaly can accept double values
711 auto& interval = currentBucket.intervals[0];
712 if (interval.hasValue()) {
713 const int64_t longVal =
714 interval.aggregate.getValueOrDefault<int64_t>(0);
715 tracker->addPastBucket(metricDimensionKey, longVal, mCurrentBucketNum);
716 }
717 }
718 }
719 }
720 }
721 } else {
722 // Accumulate partial bucket.
723 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
724 if (!currentBucket.intervals.empty()) {
725 // TODO: fix this when anomaly can accept double values
726 auto& interval = currentBucket.intervals[0];
727 if (interval.hasValue()) {
728 mCurrentFullBucket[metricDimensionKey] +=
729 interval.aggregate.getValueOrDefault<int64_t>(0);
730 }
731 }
732 }
733 }
734 }
735
getBinStarts(int valueFieldIndex) const736 const optional<const BinStarts>& NumericValueMetricProducer::getBinStarts(
737 int valueFieldIndex) const {
738 return mBinStartsList.size() == 1 ? mBinStartsList[0] : mBinStartsList[valueFieldIndex];
739 }
740
741 // Estimate for the size of NumericValues.
getAggregatedValueSize(const NumericValue & value) const742 size_t NumericValueMetricProducer::getAggregatedValueSize(const NumericValue& value) const {
743 size_t valueSize = 0;
744 // Index
745 valueSize += sizeof(int32_t);
746
747 // Value
748 valueSize += value.getSize();
749
750 // Sample Size
751 if (mIncludeSampleSize) {
752 valueSize += sizeof(int32_t);
753 }
754 return valueSize;
755 }
756
byteSizeLocked() const757 size_t NumericValueMetricProducer::byteSizeLocked() const {
758 sp<ConfigMetadataProvider> configMetadataProvider = getConfigMetadataProvider();
759 if (configMetadataProvider != nullptr && configMetadataProvider->useV2SoftMemoryCalculation()) {
760 bool dimensionGuardrailHit = StatsdStats::getInstance().hasHitDimensionGuardrail(mMetricId);
761 return computeOverheadSizeLocked(!mPastBuckets.empty() || !mSkippedBuckets.empty(),
762 dimensionGuardrailHit) +
763 mTotalDataSize;
764 }
765 size_t totalSize = 0;
766 for (const auto& [_, buckets] : mPastBuckets) {
767 totalSize += buckets.size() * kBucketSize;
768 // TODO(b/189283526): Add bytes used to store PastBucket.aggIndex vector
769 }
770 return totalSize;
771 }
772
valuePassesThreshold(const Interval & interval) const773 bool NumericValueMetricProducer::valuePassesThreshold(const Interval& interval) const {
774 if (mUploadThreshold == nullopt) {
775 return true;
776 }
777
778 double doubleValue = toDouble(getFinalValue(interval));
779
780 switch (mUploadThreshold->value_comparison_case()) {
781 case UploadThreshold::kLtInt:
782 return doubleValue < (double)mUploadThreshold->lt_int();
783 case UploadThreshold::kGtInt:
784 return doubleValue > (double)mUploadThreshold->gt_int();
785 case UploadThreshold::kLteInt:
786 return doubleValue <= (double)mUploadThreshold->lte_int();
787 case UploadThreshold::kGteInt:
788 return doubleValue >= (double)mUploadThreshold->gte_int();
789 case UploadThreshold::kLtFloat:
790 return doubleValue <= (double)mUploadThreshold->lt_float();
791 case UploadThreshold::kGtFloat:
792 return doubleValue >= (double)mUploadThreshold->gt_float();
793 default:
794 ALOGE("Value metric no upload threshold type used");
795 return false;
796 }
797 }
798
getFinalValue(const Interval & interval) const799 NumericValue NumericValueMetricProducer::getFinalValue(const Interval& interval) const {
800 if (interval.aggregate.is<HistogramValue>()) {
801 return interval.aggregate.getValue<HistogramValue>().getCompactedHistogramValue();
802 }
803 if (getAggregationTypeLocked(interval.aggIndex) != ValueMetric::AVG) {
804 return interval.aggregate;
805 } else {
806 double sum = toDouble(interval.aggregate);
807 return NumericValue(sum / interval.sampleSize);
808 }
809 }
810
getDumpProtoFields() const811 NumericValueMetricProducer::DumpProtoFields NumericValueMetricProducer::getDumpProtoFields() const {
812 return {FIELD_ID_VALUE_METRICS,
813 FIELD_ID_BUCKET_NUM,
814 FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
815 FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
816 FIELD_ID_CONDITION_TRUE_NS,
817 FIELD_ID_CONDITION_CORRECTION_NS};
818 }
819
determineCorruptionSeverity(int32_t atomId,DataCorruptedReason,LostAtomType atomType) const820 MetricProducer::DataCorruptionSeverity NumericValueMetricProducer::determineCorruptionSeverity(
821 int32_t atomId, DataCorruptedReason /*reason*/, LostAtomType atomType) const {
822 switch (atomType) {
823 case LostAtomType::kWhat:
824 return mUseDiff ? DataCorruptionSeverity::kUnrecoverable
825 : DataCorruptionSeverity::kResetOnDump;
826 case LostAtomType::kCondition:
827 case LostAtomType::kState:
828 return DataCorruptionSeverity::kUnrecoverable;
829 };
830 return DataCorruptionSeverity::kNone;
831 };
832
833 } // namespace statsd
834 } // namespace os
835 } // namespace android
836