1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define STATSD_DEBUG false // STOPSHIP if true
18 #include "Log.h"
19
20 #include "GaugeMetricProducer.h"
21
22 #include "guardrail/StatsdStats.h"
23 #include "metrics/parsing_utils/metrics_manager_util.h"
24 #include "stats_log_util.h"
25
26 using android::util::FIELD_COUNT_REPEATED;
27 using android::util::FIELD_TYPE_BOOL;
28 using android::util::FIELD_TYPE_FLOAT;
29 using android::util::FIELD_TYPE_INT32;
30 using android::util::FIELD_TYPE_INT64;
31 using android::util::FIELD_TYPE_MESSAGE;
32 using android::util::FIELD_TYPE_STRING;
33 using android::util::ProtoOutputStream;
34 using std::map;
35 using std::string;
36 using std::unordered_map;
37 using std::vector;
38 using std::make_shared;
39 using std::shared_ptr;
40
41 namespace android {
42 namespace os {
43 namespace statsd {
44
45 // for StatsLogReport
46 const int FIELD_ID_ID = 1;
47 const int FIELD_ID_GAUGE_METRICS = 8;
48 const int FIELD_ID_TIME_BASE = 9;
49 const int FIELD_ID_BUCKET_SIZE = 10;
50 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
51 const int FIELD_ID_IS_ACTIVE = 14;
52 const int FIELD_ID_DIMENSION_GUARDRAIL_HIT = 17;
53 const int FIELD_ID_ESTIMATED_MEMORY_BYTES = 18;
54 const int FIELD_ID_DATA_CORRUPTED_REASON = 19;
55 // for GaugeMetricDataWrapper
56 const int FIELD_ID_DATA = 1;
57 const int FIELD_ID_SKIPPED = 2;
58 // for SkippedBuckets
59 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
60 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
61 const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
62 // for DumpEvent Proto
63 const int FIELD_ID_BUCKET_DROP_REASON = 1;
64 const int FIELD_ID_DROP_TIME = 2;
65 // for GaugeMetricData
66 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
67 const int FIELD_ID_BUCKET_INFO = 3;
68 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
69 // for GaugeBucketInfo
70 const int FIELD_ID_BUCKET_NUM = 6;
71 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 7;
72 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 8;
73 const int FIELD_ID_AGGREGATED_ATOM = 9;
74 // for AggregatedAtomInfo
75 const int FIELD_ID_ATOM_VALUE = 1;
76 const int FIELD_ID_ATOM_TIMESTAMPS = 2;
77
GaugeMetricProducer(const ConfigKey & key,const GaugeMetric & metric,const int conditionIndex,const vector<ConditionState> & initialConditionCache,const sp<ConditionWizard> & wizard,const uint64_t protoHash,const int whatMatcherIndex,const sp<EventMatcherWizard> & matcherWizard,const int pullTagId,const int triggerAtomId,const int atomId,const int64_t timeBaseNs,const int64_t startTimeNs,const sp<StatsPullerManager> & pullerManager,const wp<ConfigMetadataProvider> configMetadataProvider,const unordered_map<int,shared_ptr<Activation>> & eventActivationMap,const unordered_map<int,vector<shared_ptr<Activation>>> & eventDeactivationMap,const size_t dimensionSoftLimit,const size_t dimensionHardLimit)78 GaugeMetricProducer::GaugeMetricProducer(
79 const ConfigKey& key, const GaugeMetric& metric, const int conditionIndex,
80 const vector<ConditionState>& initialConditionCache, const sp<ConditionWizard>& wizard,
81 const uint64_t protoHash, const int whatMatcherIndex,
82 const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int triggerAtomId,
83 const int atomId, const int64_t timeBaseNs, const int64_t startTimeNs,
84 const sp<StatsPullerManager>& pullerManager,
85 const wp<ConfigMetadataProvider> configMetadataProvider,
86 const unordered_map<int, shared_ptr<Activation>>& eventActivationMap,
87 const unordered_map<int, vector<shared_ptr<Activation>>>& eventDeactivationMap,
88 const size_t dimensionSoftLimit, const size_t dimensionHardLimit)
89 : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, initialConditionCache, wizard,
90 protoHash, eventActivationMap, eventDeactivationMap, /*slicedStateAtoms=*/{},
91 /*stateGroupMap=*/{}, getAppUpgradeBucketSplit(metric),
92 configMetadataProvider),
93 mWhatMatcherIndex(whatMatcherIndex),
94 mEventMatcherWizard(matcherWizard),
95 mPullerManager(pullerManager),
96 mPullTagId(pullTagId),
97 mTriggerAtomId(triggerAtomId),
98 mAtomId(atomId),
99 mIsPulled(pullTagId != -1),
100 mMinBucketSizeNs(metric.min_bucket_size_nanos()),
101 mSamplingType(metric.sampling_type()),
102 mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
103 : StatsdStats::kPullMaxDelayNs),
104 mDimensionSoftLimit(dimensionSoftLimit),
105 mDimensionHardLimit(dimensionHardLimit),
106 mGaugeAtomsPerDimensionLimit(metric.max_num_gauge_atoms_per_bucket()),
107 mDimensionGuardrailHit(false),
108 mSamplingPercentage(metric.sampling_percentage()),
109 mPullProbability(metric.pull_probability()) {
110 mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
111 mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
112 int64_t bucketSizeMills = 0;
113 if (metric.has_bucket()) {
114 bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
115 } else {
116 bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
117 }
118 mBucketSizeNs = bucketSizeMills * 1000000;
119
120 if (!metric.gauge_fields_filter().include_all()) {
121 translateFieldMatcher(metric.gauge_fields_filter().fields(), &mFieldMatchers);
122 }
123
124 if (metric.has_dimensions_in_what()) {
125 translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
126 mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
127 }
128
129 if (metric.links().size() > 0) {
130 for (const auto& link : metric.links()) {
131 Metric2Condition mc;
132 mc.conditionId = link.condition();
133 translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
134 translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
135 mMetric2ConditionLinks.push_back(mc);
136 }
137 mConditionSliced = true;
138 }
139 mShouldUseNestedDimensions = ShouldUseNestedDimensions(metric.dimensions_in_what());
140
141 flushIfNeededLocked(startTimeNs);
142 // Kicks off the puller immediately.
143 if (mIsPulled && isRandomNSamples()) {
144 mPullerManager->RegisterReceiver(mPullTagId, mConfigKey, this, getCurrentBucketEndTimeNs(),
145 mBucketSizeNs);
146 }
147
148 // Adjust start for partial first bucket and then pull if needed
149 mCurrentBucketStartTimeNs = startTimeNs;
150
151 VLOG("Gauge metric %lld created. bucket size %lld start_time: %lld sliced %d",
152 (long long)mMetricId, (long long)mBucketSizeNs, (long long)mTimeBaseNs, mConditionSliced);
153 }
154
~GaugeMetricProducer()155 GaugeMetricProducer::~GaugeMetricProducer() {
156 VLOG("~GaugeMetricProducer() called");
157 if (mIsPulled && isRandomNSamples()) {
158 mPullerManager->UnRegisterReceiver(mPullTagId, mConfigKey, this);
159 }
160 }
161
onConfigUpdatedLocked(const StatsdConfig & config,const int configIndex,const int metricIndex,const vector<sp<AtomMatchingTracker>> & allAtomMatchingTrackers,const unordered_map<int64_t,int> & oldAtomMatchingTrackerMap,const unordered_map<int64_t,int> & newAtomMatchingTrackerMap,const sp<EventMatcherWizard> & matcherWizard,const vector<sp<ConditionTracker>> & allConditionTrackers,const unordered_map<int64_t,int> & conditionTrackerMap,const sp<ConditionWizard> & wizard,const unordered_map<int64_t,int> & metricToActivationMap,unordered_map<int,vector<int>> & trackerToMetricMap,unordered_map<int,vector<int>> & conditionToMetricMap,unordered_map<int,vector<int>> & activationAtomTrackerToMetricMap,unordered_map<int,vector<int>> & deactivationAtomTrackerToMetricMap,vector<int> & metricsWithActivation)162 optional<InvalidConfigReason> GaugeMetricProducer::onConfigUpdatedLocked(
163 const StatsdConfig& config, const int configIndex, const int metricIndex,
164 const vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers,
165 const unordered_map<int64_t, int>& oldAtomMatchingTrackerMap,
166 const unordered_map<int64_t, int>& newAtomMatchingTrackerMap,
167 const sp<EventMatcherWizard>& matcherWizard,
168 const vector<sp<ConditionTracker>>& allConditionTrackers,
169 const unordered_map<int64_t, int>& conditionTrackerMap, const sp<ConditionWizard>& wizard,
170 const unordered_map<int64_t, int>& metricToActivationMap,
171 unordered_map<int, vector<int>>& trackerToMetricMap,
172 unordered_map<int, vector<int>>& conditionToMetricMap,
173 unordered_map<int, vector<int>>& activationAtomTrackerToMetricMap,
174 unordered_map<int, vector<int>>& deactivationAtomTrackerToMetricMap,
175 vector<int>& metricsWithActivation) {
176 optional<InvalidConfigReason> invalidConfigReason = MetricProducer::onConfigUpdatedLocked(
177 config, configIndex, metricIndex, allAtomMatchingTrackers, oldAtomMatchingTrackerMap,
178 newAtomMatchingTrackerMap, matcherWizard, allConditionTrackers, conditionTrackerMap,
179 wizard, metricToActivationMap, trackerToMetricMap, conditionToMetricMap,
180 activationAtomTrackerToMetricMap, deactivationAtomTrackerToMetricMap,
181 metricsWithActivation);
182 if (invalidConfigReason.has_value()) {
183 return invalidConfigReason;
184 }
185
186 const GaugeMetric& metric = config.gauge_metric(configIndex);
187 // Update appropriate indices: mWhatMatcherIndex, mConditionIndex and MetricsManager maps.
188 invalidConfigReason = handleMetricWithAtomMatchingTrackers(
189 metric.what(), mMetricId, metricIndex, /*enforceOneAtom=*/false,
190 allAtomMatchingTrackers, newAtomMatchingTrackerMap, trackerToMetricMap,
191 mWhatMatcherIndex);
192 if (invalidConfigReason.has_value()) {
193 return invalidConfigReason;
194 }
195
196 // Need to update maps since the index changed, but mTriggerAtomId will not change.
197 int triggerTrackerIndex;
198 if (metric.has_trigger_event()) {
199 invalidConfigReason = handleMetricWithAtomMatchingTrackers(
200 metric.trigger_event(), mMetricId, metricIndex,
201 /*enforceOneAtom=*/true, allAtomMatchingTrackers, newAtomMatchingTrackerMap,
202 trackerToMetricMap, triggerTrackerIndex);
203 if (invalidConfigReason.has_value()) {
204 return invalidConfigReason;
205 }
206 }
207
208 if (metric.has_condition()) {
209 invalidConfigReason = handleMetricWithConditions(
210 metric.condition(), mMetricId, metricIndex, conditionTrackerMap, metric.links(),
211 allConditionTrackers, mConditionTrackerIndex, conditionToMetricMap);
212 if (invalidConfigReason.has_value()) {
213 return invalidConfigReason;
214 }
215 }
216 sp<EventMatcherWizard> tmpEventWizard = mEventMatcherWizard;
217 mEventMatcherWizard = matcherWizard;
218
219 // If this is a config update, we must have just forced a partial bucket. Pull if needed to get
220 // data for the new bucket.
221 if (mCondition == ConditionState::kTrue && mIsActive && mIsPulled && isRandomNSamples()) {
222 pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
223 }
224 return nullopt;
225 }
226
dumpStatesLocked(int out,bool verbose) const227 void GaugeMetricProducer::dumpStatesLocked(int out, bool verbose) const {
228 if (mCurrentSlicedBucket == nullptr ||
229 mCurrentSlicedBucket->size() == 0) {
230 return;
231 }
232
233 dprintf(out, "GaugeMetric %lld dimension size %lu\n", (long long)mMetricId,
234 (unsigned long)mCurrentSlicedBucket->size());
235 if (verbose) {
236 for (const auto& it : *mCurrentSlicedBucket) {
237 dprintf(out, "\t(what)%s\t(states)%s %d atoms\n",
238 it.first.getDimensionKeyInWhat().toString().c_str(),
239 it.first.getStateValuesKey().toString().c_str(), (int)it.second.size());
240 }
241 }
242 }
243
clearPastBucketsLocked(const int64_t dumpTimeNs)244 void GaugeMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
245 flushIfNeededLocked(dumpTimeNs);
246 mPastBuckets.clear();
247 mSkippedBuckets.clear();
248 resetDataCorruptionFlagsLocked();
249 mTotalDataSize = 0;
250 }
251
onDumpReportLocked(const int64_t dumpTimeNs,const bool include_current_partial_bucket,const bool erase_data,const DumpLatency dumpLatency,std::set<string> * str_set,std::set<int32_t> & usedUids,ProtoOutputStream * protoOutput)252 void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
253 const bool include_current_partial_bucket,
254 const bool erase_data, const DumpLatency dumpLatency,
255 std::set<string>* str_set, std::set<int32_t>& usedUids,
256 ProtoOutputStream* protoOutput) {
257 VLOG("Gauge metric %lld report now...", (long long)mMetricId);
258 if (include_current_partial_bucket) {
259 flushLocked(dumpTimeNs);
260 } else {
261 flushIfNeededLocked(dumpTimeNs);
262 }
263
264 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
265 protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
266
267 // Data corrupted reason
268 writeDataCorruptedReasons(*protoOutput, FIELD_ID_DATA_CORRUPTED_REASON,
269 mDataCorruptedDueToQueueOverflow != DataCorruptionSeverity::kNone,
270 mDataCorruptedDueToSocketLoss != DataCorruptionSeverity::kNone);
271
272 if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
273 if (erase_data) {
274 resetDataCorruptionFlagsLocked();
275 }
276 return;
277 }
278
279 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ESTIMATED_MEMORY_BYTES,
280 (long long)byteSizeLocked());
281
282 if (mDimensionGuardrailHit) {
283 protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_DIMENSION_GUARDRAIL_HIT,
284 mDimensionGuardrailHit);
285 }
286
287 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
288 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
289
290 // Fills the dimension path if not slicing by a primitive repeated field or position ALL.
291 if (!mShouldUseNestedDimensions) {
292 if (!mDimensionsInWhat.empty()) {
293 uint64_t dimenPathToken = protoOutput->start(
294 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
295 writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
296 protoOutput->end(dimenPathToken);
297 }
298 }
299
300 uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
301
302 for (const auto& skippedBucket : mSkippedBuckets) {
303 uint64_t wrapperToken =
304 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
305 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
306 (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
307 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
308 (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
309
310 for (const auto& dropEvent : skippedBucket.dropEvents) {
311 uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
312 FIELD_ID_SKIPPED_DROP_EVENT);
313 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
314 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME, (long long) (NanoToMillis(dropEvent.dropTimeNs)));
315 protoOutput->end(dropEventToken);
316 }
317 protoOutput->end(wrapperToken);
318 }
319
320 for (const auto& pair : mPastBuckets) {
321 const MetricDimensionKey& dimensionKey = pair.first;
322
323 VLOG("Gauge dimension key %s", dimensionKey.toString().c_str());
324 uint64_t wrapperToken =
325 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
326
327 // First fill dimension.
328 if (mShouldUseNestedDimensions) {
329 uint64_t dimensionToken = protoOutput->start(
330 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
331 writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), mUidFields, str_set,
332 usedUids, protoOutput);
333 protoOutput->end(dimensionToken);
334 } else {
335 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(),
336 FIELD_ID_DIMENSION_LEAF_IN_WHAT, mUidFields, str_set,
337 usedUids, protoOutput);
338 }
339
340 // Then fill bucket_info (GaugeBucketInfo).
341 for (const auto& bucket : pair.second) {
342 uint64_t bucketInfoToken = protoOutput->start(
343 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
344
345 if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
346 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
347 (long long)NanoToMillis(bucket.mBucketStartNs));
348 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
349 (long long)NanoToMillis(bucket.mBucketEndNs));
350 } else {
351 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
352 (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
353 }
354
355 if (!bucket.mAggregatedAtoms.empty()) {
356 for (const auto& [atomDimensionKey, elapsedTimestampsNs] :
357 bucket.mAggregatedAtoms) {
358 uint64_t aggregatedAtomToken = protoOutput->start(
359 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_AGGREGATED_ATOM);
360 uint64_t atomToken =
361 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM_VALUE);
362 writeFieldValueTreeToStream(mAtomId,
363 atomDimensionKey.getAtomFieldValues().getValues(),
364 mUidFields, usedUids, protoOutput);
365 protoOutput->end(atomToken);
366 for (int64_t timestampNs : elapsedTimestampsNs) {
367 protoOutput->write(
368 FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_ATOM_TIMESTAMPS,
369 (long long)timestampNs);
370 }
371 protoOutput->end(aggregatedAtomToken);
372 }
373 }
374
375 protoOutput->end(bucketInfoToken);
376 VLOG("Gauge \t bucket [%lld - %lld] includes %d atoms.",
377 (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs,
378 (int)bucket.mAggregatedAtoms.size());
379 }
380 protoOutput->end(wrapperToken);
381 }
382 protoOutput->end(protoToken);
383
384
385 if (erase_data) {
386 mPastBuckets.clear();
387 mSkippedBuckets.clear();
388 mDimensionGuardrailHit = false;
389 resetDataCorruptionFlagsLocked();
390 mTotalDataSize = 0;
391 }
392 }
393
prepareFirstBucketLocked()394 void GaugeMetricProducer::prepareFirstBucketLocked() {
395 if (mCondition == ConditionState::kTrue && mIsActive && mIsPulled && isRandomNSamples()) {
396 pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
397 }
398 }
399
400 // Only call if mCondition == ConditionState::kTrue && metric is active.
pullAndMatchEventsLocked(const int64_t timestampNs)401 void GaugeMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
402 bool triggerPuller = false;
403 switch(mSamplingType) {
404 // When the metric wants to do random sampling and there is already one gauge atom for the
405 // current bucket, do not do it again.
406 case GaugeMetric::RANDOM_ONE_SAMPLE: {
407 triggerPuller = mCurrentSlicedBucket->empty();
408 break;
409 }
410 case GaugeMetric::CONDITION_CHANGE_TO_TRUE:
411 case GaugeMetric::FIRST_N_SAMPLES: {
412 triggerPuller = true;
413 break;
414 }
415 default:
416 break;
417 }
418 if (!triggerPuller || !shouldKeepRandomSample(mPullProbability)) {
419 return;
420 }
421 vector<std::shared_ptr<LogEvent>> allData;
422 if (!mPullerManager->Pull(mPullTagId, mConfigKey, timestampNs, &allData)) {
423 ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
424 return;
425 }
426 const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
427 StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
428 if (pullDelayNs > mMaxPullDelayNs) {
429 ALOGE("Pull finish too late for atom %d", mPullTagId);
430 StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
431 return;
432 }
433 for (const auto& data : allData) {
434 const auto [matchResult, transformedEvent] =
435 mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
436 if (matchResult == MatchingState::kMatched) {
437 LogEvent localCopy = transformedEvent == nullptr ? *data : *transformedEvent;
438 localCopy.setElapsedTimestampNs(timestampNs);
439 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
440 }
441 }
442 }
443
onActiveStateChangedLocked(const int64_t eventTimeNs,const bool isActive)444 void GaugeMetricProducer::onActiveStateChangedLocked(const int64_t eventTimeNs,
445 const bool isActive) {
446 MetricProducer::onActiveStateChangedLocked(eventTimeNs, isActive);
447
448 if (ConditionState::kTrue != mCondition) {
449 return;
450 }
451
452 if (isActive && mIsPulled && isRandomNSamples()) {
453 pullAndMatchEventsLocked(eventTimeNs);
454 }
455 }
456
onConditionChangedLocked(const bool conditionMet,const int64_t eventTimeNs)457 void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
458 const int64_t eventTimeNs) {
459 VLOG("GaugeMetric %lld onConditionChanged", (long long)mMetricId);
460
461 mCondition = conditionMet ? ConditionState::kTrue : ConditionState::kFalse;
462 if (!mIsActive) {
463 return;
464 }
465
466 flushIfNeededLocked(eventTimeNs);
467 if (conditionMet && mIsPulled &&
468 (isRandomNSamples() || mSamplingType == GaugeMetric::CONDITION_CHANGE_TO_TRUE)) {
469 pullAndMatchEventsLocked(eventTimeNs);
470 } // else: Push mode. No need to proactively pull the gauge data.
471 }
472
onSlicedConditionMayChangeLocked(bool overallCondition,const int64_t eventTimeNs)473 void GaugeMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
474 const int64_t eventTimeNs) {
475 VLOG("GaugeMetric %lld onSlicedConditionMayChange overall condition %d", (long long)mMetricId,
476 overallCondition);
477 mCondition = overallCondition ? ConditionState::kTrue : ConditionState::kFalse;
478 if (!mIsActive) {
479 return;
480 }
481
482 flushIfNeededLocked(eventTimeNs);
483 // If the condition is sliced, mCondition is true if any of the dimensions is true. And we will
484 // pull for every dimension.
485 if (overallCondition && mIsPulled && mTriggerAtomId == -1) {
486 pullAndMatchEventsLocked(eventTimeNs);
487 } // else: Push mode. No need to proactively pull the gauge data.
488 }
489
getGaugeFields(const LogEvent & event)490 std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const LogEvent& event) {
491 std::shared_ptr<vector<FieldValue>> gaugeFields;
492 if (mFieldMatchers.size() > 0) {
493 gaugeFields = std::make_shared<vector<FieldValue>>();
494 filterGaugeValues(mFieldMatchers, event.getValues(), gaugeFields.get());
495 } else {
496 gaugeFields = std::make_shared<vector<FieldValue>>(event.getValues());
497 }
498 // Trim all dimension fields from output. Dimensions will appear in output report and will
499 // benefit from dictionary encoding. For large pulled atoms, this can give the benefit of
500 // optional repeated field.
501 for (const auto& field : mDimensionsInWhat) {
502 for (auto it = gaugeFields->begin(); it != gaugeFields->end();) {
503 if (it->mField.matches(field)) {
504 it = gaugeFields->erase(it);
505 } else {
506 it++;
507 }
508 }
509 }
510 return gaugeFields;
511 }
512
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,PullResult pullResult,int64_t originalPullTimeNs)513 void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
514 PullResult pullResult, int64_t originalPullTimeNs) {
515 std::lock_guard<std::mutex> lock(mMutex);
516 if (pullResult != PullResult::PULL_RESULT_SUCCESS || allData.size() == 0) {
517 return;
518 }
519 const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs;
520 StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
521 if (pullDelayNs > mMaxPullDelayNs) {
522 ALOGE("Pull finish too late for atom %d", mPullTagId);
523 StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
524 return;
525 }
526 for (const auto& data : allData) {
527 const auto [matchResult, transformedEvent] =
528 mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex);
529 if (matchResult == MatchingState::kMatched) {
530 onMatchedLogEventLocked(mWhatMatcherIndex,
531 transformedEvent == nullptr ? *data : *transformedEvent);
532 }
533 }
534 }
535
hitGuardRailLocked(const MetricDimensionKey & newKey)536 bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
537 if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
538 return false;
539 }
540 // 1. Report the tuple count if the tuple count > soft limit
541 if (mCurrentSlicedBucket->size() >= mDimensionSoftLimit) {
542 size_t newTupleCount = mCurrentSlicedBucket->size() + 1;
543 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
544 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
545 if (newTupleCount > mDimensionHardLimit) {
546 if (!mHasHitGuardrail) {
547 ALOGE("GaugeMetric %lld dropping data for dimension key %s", (long long)mMetricId,
548 newKey.toString().c_str());
549 mHasHitGuardrail = true;
550 }
551 mDimensionGuardrailHit = true;
552 StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
553 return true;
554 }
555 }
556
557 return false;
558 }
559
onMatchedLogEventInternalLocked(const size_t matcherIndex,const MetricDimensionKey & eventKey,const ConditionKey & conditionKey,bool condition,const LogEvent & event,const map<int,HashableDimensionKey> &)560 void GaugeMetricProducer::onMatchedLogEventInternalLocked(
561 const size_t matcherIndex, const MetricDimensionKey& eventKey,
562 const ConditionKey& conditionKey, bool condition, const LogEvent& event,
563 const map<int, HashableDimensionKey>& /*statePrimaryKeys*/) {
564 if (condition == false) {
565 return;
566 }
567
568 if (mPullTagId == -1 && mSamplingPercentage < 100 &&
569 !shouldKeepRandomSample(mSamplingPercentage)) {
570 return;
571 }
572
573 int64_t eventTimeNs = event.GetElapsedTimestampNs();
574 if (eventTimeNs < mCurrentBucketStartTimeNs) {
575 VLOG("Gauge Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
576 (long long)mCurrentBucketStartTimeNs);
577 return;
578 }
579 flushIfNeededLocked(eventTimeNs);
580
581 if (mTriggerAtomId == event.GetTagId()) {
582 // Both Active state and Condition are true here.
583 // Active state being true is checked in onMatchedLogEventLocked.
584 // Condition being true is checked at the start of this method.
585 pullAndMatchEventsLocked(eventTimeNs);
586 return;
587 }
588
589 // When gauge metric wants to randomly sample the output atom, we just simply use the first
590 // gauge in the given bucket.
591 if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() &&
592 mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
593 return;
594 }
595 if (hitGuardRailLocked(eventKey)) {
596 return;
597 }
598 if ((*mCurrentSlicedBucket)[eventKey].size() >= mGaugeAtomsPerDimensionLimit) {
599 return;
600 }
601
602 const int64_t truncatedElapsedTimestampNs = truncateTimestampIfNecessary(event);
603 GaugeAtom gaugeAtom(getGaugeFields(event), truncatedElapsedTimestampNs);
604 (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom);
605 // Anomaly detection on gauge metric only works when there is one numeric
606 // field specified.
607 if (mAnomalyTrackers.size() > 0) {
608 if (gaugeAtom.mFields->size() == 1) {
609 const Value& value = gaugeAtom.mFields->begin()->mValue;
610 long gaugeVal = 0;
611 if (value.getType() == INT) {
612 gaugeVal = (long)value.int_value;
613 } else if (value.getType() == LONG) {
614 gaugeVal = value.long_value;
615 }
616 for (auto& tracker : mAnomalyTrackers) {
617 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId,
618 eventKey, gaugeVal);
619 }
620 }
621 }
622 }
623
updateCurrentSlicedBucketForAnomaly()624 void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
625 for (const auto& slice : *mCurrentSlicedBucket) {
626 if (slice.second.empty()) {
627 continue;
628 }
629 const Value& value = slice.second.front().mFields->front().mValue;
630 long gaugeVal = 0;
631 if (value.getType() == INT) {
632 gaugeVal = (long)value.int_value;
633 } else if (value.getType() == LONG) {
634 gaugeVal = value.long_value;
635 }
636 (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal;
637 }
638 }
639
dropDataLocked(const int64_t dropTimeNs)640 void GaugeMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
641 flushIfNeededLocked(dropTimeNs);
642 StatsdStats::getInstance().noteBucketDropped(mMetricId);
643 mPastBuckets.clear();
644 resetDataCorruptionFlagsLocked();
645 mTotalDataSize = 0;
646 }
647
648 // When a new matched event comes in, we check if event falls into the current
649 // bucket. If not, flush the old counter to past buckets and initialize the new
650 // bucket.
651 // if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
652 // the GaugeMetricProducer while holding the lock.
flushIfNeededLocked(const int64_t eventTimeNs)653 void GaugeMetricProducer::flushIfNeededLocked(const int64_t eventTimeNs) {
654 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
655
656 if (eventTimeNs < currentBucketEndTimeNs) {
657 VLOG("Gauge eventTime is %lld, less than next bucket start time %lld",
658 (long long)eventTimeNs, (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
659 return;
660 }
661
662 // Adjusts the bucket start and end times.
663 int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
664 int64_t nextBucketNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
665 flushCurrentBucketLocked(eventTimeNs, nextBucketNs);
666
667 mCurrentBucketNum += numBucketsForward;
668 VLOG("Gauge metric %lld: new bucket start time: %lld", (long long)mMetricId,
669 (long long)mCurrentBucketStartTimeNs);
670 }
671
flushCurrentBucketLocked(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)672 void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t eventTimeNs,
673 const int64_t nextBucketStartTimeNs) {
674 int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
675 int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
676
677 GaugeBucket info;
678 info.mBucketStartNs = mCurrentBucketStartTimeNs;
679 info.mBucketEndNs = bucketEndTime;
680
681 // Add bucket to mPastBuckets if bucket is large enough.
682 // Otherwise, drop the bucket data and add bucket metadata to mSkippedBuckets.
683 bool isBucketLargeEnough = info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
684 if (isBucketLargeEnough) {
685 for (const auto& slice : *mCurrentSlicedBucket) {
686 info.mAggregatedAtoms.clear();
687 for (const GaugeAtom& atom : slice.second) {
688 AtomDimensionKey key(mAtomId, HashableDimensionKey(*atom.mFields));
689 vector<int64_t>& elapsedTimestampsNs = info.mAggregatedAtoms[key];
690 elapsedTimestampsNs.push_back(atom.mElapsedTimestampNs);
691 }
692 auto& bucketList = mPastBuckets[slice.first];
693 const bool isFirstBucket = bucketList.empty();
694 bucketList.push_back(info);
695 mTotalDataSize += computeGaugeBucketSizeLocked(eventTimeNs >= fullBucketEndTimeNs,
696 /*dimKey=*/slice.first, isFirstBucket,
697 info.mAggregatedAtoms);
698 VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId,
699 slice.first.toString().c_str());
700 }
701 } else if (mIsActive) {
702 mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
703 mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime;
704 if (!maxDropEventsReached()) {
705 mCurrentSkippedBucket.dropEvents.emplace_back(
706 buildDropEvent(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL));
707 }
708 mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
709 mTotalDataSize += computeSkippedBucketSizeLocked(mCurrentSkippedBucket);
710 }
711
712 // If we have anomaly trackers, we need to update the partial bucket values.
713 if (mAnomalyTrackers.size() > 0) {
714 updateCurrentSlicedBucketForAnomaly();
715
716 if (eventTimeNs > fullBucketEndTimeNs) {
717 // This is known to be a full bucket, so send this data to the anomaly tracker.
718 for (auto& tracker : mAnomalyTrackers) {
719 tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum);
720 }
721 mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
722 }
723 }
724
725 StatsdStats::getInstance().noteBucketCount(mMetricId);
726 mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
727 mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
728 mCurrentSkippedBucket.reset();
729 // Reset mHasHitGuardrail boolean since bucket was reset
730 mHasHitGuardrail = false;
731 }
732
733 // Estimate for the size of a GaugeBucket.
computeGaugeBucketSizeLocked(const bool isFullBucket,const MetricDimensionKey & dimKey,const bool isFirstBucket,const std::unordered_map<AtomDimensionKey,std::vector<int64_t>> & aggregatedAtoms) const734 size_t GaugeMetricProducer::computeGaugeBucketSizeLocked(
735 const bool isFullBucket, const MetricDimensionKey& dimKey, const bool isFirstBucket,
736 const std::unordered_map<AtomDimensionKey, std::vector<int64_t>>& aggregatedAtoms) const {
737 size_t bucketSize =
738 MetricProducer::computeBucketSizeLocked(isFullBucket, dimKey, isFirstBucket);
739
740 // Gauge Atoms and timestamps
741 for (const auto& pair : aggregatedAtoms) {
742 bucketSize += getFieldValuesSizeV2(pair.first.getAtomFieldValues().getValues());
743 bucketSize += sizeof(int64_t) * pair.second.size();
744 }
745
746 return bucketSize;
747 }
748
byteSizeLocked() const749 size_t GaugeMetricProducer::byteSizeLocked() const {
750 sp<ConfigMetadataProvider> configMetadataProvider = getConfigMetadataProvider();
751 if (configMetadataProvider != nullptr && configMetadataProvider->useV2SoftMemoryCalculation()) {
752 return computeOverheadSizeLocked(!mPastBuckets.empty() || !mSkippedBuckets.empty(),
753 mDimensionGuardrailHit) +
754 mTotalDataSize;
755 }
756 size_t totalSize = 0;
757 for (const auto& pair : mPastBuckets) {
758 for (const auto& bucket : pair.second) {
759 for (const auto& [atomDimensionKey, elapsedTimestampsNs] : bucket.mAggregatedAtoms) {
760 totalSize += sizeof(FieldValue) *
761 atomDimensionKey.getAtomFieldValues().getValues().size();
762 totalSize += sizeof(int64_t) * elapsedTimestampsNs.size();
763 }
764 }
765 }
766 return totalSize;
767 }
768
determineCorruptionSeverity(int32_t atomId,DataCorruptedReason reason,LostAtomType atomType) const769 MetricProducer::DataCorruptionSeverity GaugeMetricProducer::determineCorruptionSeverity(
770 int32_t atomId, DataCorruptedReason reason, LostAtomType atomType) const {
771 switch (atomType) {
772 case LostAtomType::kWhat:
773 return DataCorruptionSeverity::kResetOnDump;
774 case LostAtomType::kCondition:
775 return DataCorruptionSeverity::kUnrecoverable;
776 case LostAtomType::kState:
777 break;
778 };
779 return DataCorruptionSeverity::kNone;
780 };
781
782 } // namespace statsd
783 } // namespace os
784 } // namespace android
785