1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <gtest/gtest_prod.h>
20 
21 #include <optional>
22 
23 #include "FieldValue.h"
24 #include "HashableDimensionKey.h"
25 #include "MetricProducer.h"
26 #include "anomaly/AnomalyTracker.h"
27 #include "condition/ConditionTracker.h"
28 #include "external/PullDataReceiver.h"
29 #include "external/StatsPullerManager.h"
30 #include "matchers/EventMatcherWizard.h"
31 #include "src/statsd_config.pb.h"
32 #include "stats_log_util.h"
33 #include "stats_util.h"
34 
35 namespace android {
36 namespace os {
37 namespace statsd {
38 
39 template <typename AggregatedValue>
40 struct PastBucket {
41     int64_t mBucketStartNs;
42     int64_t mBucketEndNs;
43     std::vector<int> aggIndex;
44     std::vector<AggregatedValue> aggregates;
45     std::vector<int> sampleSizes;
46 
47     /**
48      * If the metric has no condition, then this field is just wasted.
49      * When we tune statsd memory usage in the future, this is a candidate to optimize.
50      */
51     int64_t mConditionTrueNs;
52 
53     /**
54      * The semantic is the value which needs to be applied to mConditionTrueNs for correction
55      * to be performed prior normalization calculation on the user (read server) side. Applied only
56      * to ValueMetrics with pulled atoms.
57      */
58     int64_t mConditionCorrectionNs;
59 };
60 
61 // Aggregates values within buckets.
62 //
63 // There are different events that might complete a bucket
64 // - a condition change
65 // - an app upgrade
66 // - an alarm set to the end of the bucket
67 template <typename AggregatedValue, typename DimExtras>
68 class ValueMetricProducer : public MetricProducer, public virtual PullDataReceiver {
69 public:
70     struct PullOptions {
71         const int pullAtomId;
72         const sp<StatsPullerManager>& pullerManager;
73     };
74 
75     struct BucketOptions {
76         const int64_t timeBaseNs;
77         const int64_t startTimeNs;
78         const int64_t bucketSizeNs;
79         const int64_t minBucketSizeNs;
80         const optional<int64_t> conditionCorrectionThresholdNs;
81         const optional<bool> splitBucketForAppUpgrade;
82     };
83 
84     struct WhatOptions {
85         const bool containsAnyPositionInDimensionsInWhat;
86         const bool shouldUseNestedDimensions;
87         const int whatMatcherIndex;
88         const sp<EventMatcherWizard>& matcherWizard;
89         const FieldMatcher& dimensionsInWhat;
90         const vector<Matcher>& fieldMatchers;
91         const vector<ValueMetric::AggregationType> aggregationTypes;
92         const std::vector<std::optional<const BinStarts>> binStartsList;
93     };
94 
95     struct ConditionOptions {
96         const int conditionIndex;
97         const ConditionLinks& conditionLinks;
98         const vector<ConditionState>& initialConditionCache;
99         const sp<ConditionWizard>& conditionWizard;
100     };
101 
102     struct StateOptions {
103         const StateLinks& stateLinks;
104         const vector<int>& slicedStateAtoms;
105         const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap;
106     };
107 
108     struct ActivationOptions {
109         const std::unordered_map<int, std::shared_ptr<Activation>>& eventActivationMap;
110         const std::unordered_map<int, std::vector<std::shared_ptr<Activation>>>&
111                 eventDeactivationMap;
112     };
113 
114     struct GuardrailOptions {
115         const size_t dimensionSoftLimit;
116         const size_t dimensionHardLimit;
117     };
118 
119     virtual ~ValueMetricProducer();
120 
121     // Process data pulled on bucket boundary.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & data,PullResult pullResult,int64_t originalPullTimeNs)122     virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
123                               PullResult pullResult, int64_t originalPullTimeNs) override {
124     }
125 
126     // Determine if metric needs to pull
isPullNeeded()127     virtual bool isPullNeeded() const override {
128         return false;
129     }
130 
131     // ValueMetric needs special logic if it's a pulled atom.
132     void onStatsdInitCompleted(int64_t eventTimeNs) override;
133 
134     void onStateChanged(int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey,
135                         const FieldValue& oldState, const FieldValue& newState) override;
136 
137 protected:
138     ValueMetricProducer(int64_t metricId, const ConfigKey& key, uint64_t protoHash,
139                         const PullOptions& pullOptions, const BucketOptions& bucketOptions,
140                         const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
141                         const StateOptions& stateOptions,
142                         const ActivationOptions& activationOptions,
143                         const GuardrailOptions& guardrailOptions,
144                         const wp<ConfigMetadataProvider> configMetadataProvider);
145 
146     void onMatchedLogEventInternalLocked(
147             const size_t matcherIndex, const MetricDimensionKey& eventKey,
148             const ConditionKey& conditionKey, bool condition, const LogEvent& event,
149             const std::map<int, HashableDimensionKey>& statePrimaryKeys) override;
150 
151     // Determine whether or not a LogEvent can be skipped.
152     virtual inline bool canSkipLogEventLocked(
153             const MetricDimensionKey& eventKey, bool condition, int64_t eventTimeNs,
154             const std::map<int, HashableDimensionKey>& statePrimaryKeys) const = 0;
155 
156     void notifyAppUpgradeInternalLocked(const int64_t eventTimeNs) override;
157 
158     void onDumpReportLocked(const int64_t dumpTimeNs, const bool includeCurrentPartialBucket,
159                             const bool eraseData, const DumpLatency dumpLatency,
160                             std::set<string>* strSet, std::set<int32_t>& usedUids,
161                             android::util::ProtoOutputStream* protoOutput) override;
162 
163     struct DumpProtoFields {
164         const int metricTypeFieldId;
165         const int bucketNumFieldId;
166         const int startBucketMsFieldId;
167         const int endBucketMsFieldId;
168         const int conditionTrueNsFieldId;
169         const optional<int> conditionCorrectionNsFieldId;
170     };
171 
172     virtual DumpProtoFields getDumpProtoFields() const = 0;
173 
174     void clearPastBucketsLocked(const int64_t dumpTimeNs) override;
175 
176     // ValueMetricProducer internal interface to handle active state change.
177     void onActiveStateChangedLocked(const int64_t eventTimeNs, const bool isActive) override;
178 
onActiveStateChangedInternalLocked(const int64_t eventTimeNs,const bool isActive)179     virtual void onActiveStateChangedInternalLocked(const int64_t eventTimeNs,
180                                                     const bool isActive) {
181     }
182 
183     // ValueMetricProducer internal interface to handle condition change.
184     void onConditionChangedLocked(const bool condition, int64_t eventTimeNs) override;
185 
186     // Only called when mIsActive, the event is NOT too late, and after pulling.
onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)187     virtual void onConditionChangedInternalLocked(const ConditionState oldCondition,
188                                                   const ConditionState newCondition,
189                                                   const int64_t eventTimeNs) {
190     }
191 
192     // Internal interface to handle sliced condition change.
193     void onSlicedConditionMayChangeLocked(bool overallCondition, int64_t eventTime) override;
194 
195     void dumpStatesLocked(int out, bool verbose) const override;
196 
197     virtual std::string aggregatedValueToString(const AggregatedValue& aggregate) const = 0;
198 
199     // For pulled metrics, this method should only be called if a pull has been done. Else we will
200     // not have complete data for the bucket.
201     void flushIfNeededLocked(int64_t eventTime) override;
202 
203     // For pulled metrics, this method should only be called if a pulled has been done. Else we will
204     // not have complete data for the bucket.
205     void flushCurrentBucketLocked(int64_t eventTimeNs, int64_t nextBucketStartTimeNs) override;
206 
207     void dropDataLocked(const int64_t dropTimeNs) override;
208 
209     // Calculate how many buckets are present between the current bucket and eventTimeNs.
210     int64_t calcBucketsForwardCount(const int64_t eventTimeNs) const;
211 
212     // Mark the data as invalid.
213     virtual void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason);
214 
215     // Skips the current bucket without notifying StatsdStats of the skipped bucket.
216     // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that
217     // causes the bucket to be invalidated will not notify StatsdStats.
218     void skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason);
219 
220     optional<InvalidConfigReason> onConfigUpdatedLocked(
221             const StatsdConfig& config, int configIndex, int metricIndex,
222             const std::vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers,
223             const std::unordered_map<int64_t, int>& oldAtomMatchingTrackerMap,
224             const std::unordered_map<int64_t, int>& newAtomMatchingTrackerMap,
225             const sp<EventMatcherWizard>& matcherWizard,
226             const std::vector<sp<ConditionTracker>>& allConditionTrackers,
227             const std::unordered_map<int64_t, int>& conditionTrackerMap,
228             const sp<ConditionWizard>& wizard,
229             const std::unordered_map<int64_t, int>& metricToActivationMap,
230             std::unordered_map<int, std::vector<int>>& trackerToMetricMap,
231             std::unordered_map<int, std::vector<int>>& conditionToMetricMap,
232             std::unordered_map<int, std::vector<int>>& activationAtomTrackerToMetricMap,
233             std::unordered_map<int, std::vector<int>>& deactivationAtomTrackerToMetricMap,
234             std::vector<int>& metricsWithActivation) override;
235 
236     size_t computeValueBucketSizeLocked(const bool isFullBucket, const MetricDimensionKey& dimKey,
237                                         const bool isFirstBucket,
238                                         const PastBucket<AggregatedValue>& bucket) const;
239 
240     virtual size_t getAggregatedValueSize(const AggregatedValue& value) const = 0;
241 
242     virtual optional<int64_t> getConditionIdForMetric(const StatsdConfig& config,
243                                                       const int configIndex) const = 0;
244 
245     virtual int64_t getWhatAtomMatcherIdForMetric(const StatsdConfig& config,
246                                                   const int configIndex) const = 0;
247 
248     virtual ConditionLinks getConditionLinksForMetric(const StatsdConfig& config,
249                                                       const int configIndex) const = 0;
250 
251     int mWhatMatcherIndex;
252 
253     sp<EventMatcherWizard> mEventMatcherWizard;
254 
255     const sp<StatsPullerManager> mPullerManager;
256 
257     // Value fields for matching.
258     const std::vector<Matcher> mFieldMatchers;
259 
260     // Value fields for matching.
261     std::set<HashableDimensionKey> mMatchedMetricDimensionKeys;
262 
263     // Holds the atom id, primary key pair from a state change.
264     // Only used for pulled metrics.
265     // TODO(b/185796114): can be passed as function arguments instead.
266     pair<int32_t, HashableDimensionKey> mStateChangePrimaryKey;
267 
268     // Atom Id for pulled data. -1 if this is not pulled.
269     const int mPullAtomId;
270 
271     // Tracks the value information of one value field.
272     struct Interval {
273         // Index in multi value aggregation.
274         int aggIndex;
275 
276         // Current aggregation, depending on the aggregation type.
277         AggregatedValue aggregate;
278 
279         // Number of samples collected.
280         int sampleSize = 0;
281 
hasValueInterval282         inline bool hasValue() const {
283             return sampleSize > 0;
284         }
285     };
286 
287     // Internal state of an ongoing aggregation bucket.
288     struct CurrentBucket {
289         // If the `MetricDimensionKey` state key is the current state key, then
290         // the condition timer will be updated later (e.g. condition/state/active
291         // state change) with the correct condition and time.
CurrentBucketCurrentBucket292         CurrentBucket() : intervals(), conditionTimer(ConditionTimer(false, 0)) {
293         }
294         // Value information for each value field of the metric.
295         std::vector<Interval> intervals;
296         // Tracks how long the condition is true.
297         ConditionTimer conditionTimer;
298     };
299 
300     // Tracks the internal state in the ongoing aggregation bucket for each DimensionsInWhat
301     // key and StateValuesKey pair.
302     std::unordered_map<MetricDimensionKey, CurrentBucket> mCurrentSlicedBucket;
303 
304     // State key and any extra information for a specific DimensionsInWhat key.
305     struct DimensionsInWhatInfo {
DimensionsInWhatInfoDimensionsInWhatInfo306         DimensionsInWhatInfo(const HashableDimensionKey& stateKey)
307             : dimExtras(), currentState(stateKey), hasCurrentState(false) {
308         }
309 
310         DimExtras dimExtras;
311 
312         // Whether new data is seen in the bucket.
313         // TODO, this could be per base in the dim extras.
314         bool seenNewData = false;
315 
316         // Last seen state value(s).
317         HashableDimensionKey currentState;
318         // Whether this dimensions in what key has a current state key.
319         bool hasCurrentState;
320     };
321 
322     // Tracks current state key and other information for each DimensionsInWhat key.
323     std::unordered_map<HashableDimensionKey, DimensionsInWhatInfo> mDimInfos;
324 
325     // Save the past buckets and we can clear when the StatsLogReport is dumped.
326     std::unordered_map<MetricDimensionKey, std::vector<PastBucket<AggregatedValue>>> mPastBuckets;
327 
328     const int64_t mMinBucketSizeNs;
329 
330     // Util function to check whether the specified dimension hits the guardrail.
331     bool hitGuardRailLocked(const MetricDimensionKey& newKey) const;
332 
333     bool hasReachedGuardRailLimit() const;
334 
pullAndMatchEventsLocked(const int64_t timestampNs)335     virtual void pullAndMatchEventsLocked(const int64_t timestampNs) {
336     }
337 
338     virtual bool multipleBucketsSkipped(const int64_t numBucketsForward) const = 0;
339 
340     virtual PastBucket<AggregatedValue> buildPartialBucket(int64_t bucketEndTime,
341                                                            std::vector<Interval>& intervals) = 0;
342 
343     virtual void closeCurrentBucket(const int64_t eventTimeNs, int64_t nextBucketStartTimeNs);
344 
345     virtual void initNextSlicedBucket(int64_t nextBucketStartTimeNs);
346 
347     // Updates the condition timers in the current sliced bucket when there is a
348     // condition change or an active state change.
349     void updateCurrentSlicedBucketConditionTimers(bool newCondition, int64_t eventTimeNs);
350 
351     virtual void writePastBucketAggregateToProto(const int aggIndex,
352                                                  const AggregatedValue& aggregate,
353                                                  const int sampleSize,
354                                                  ProtoOutputStream* const protoOutput) const = 0;
355 
356     static const size_t kBucketSize = sizeof(PastBucket<AggregatedValue>{});
357 
358     const size_t mDimensionSoftLimit;
359 
360     const size_t mDimensionHardLimit;
361 
362     // This is to track whether or not the bucket is skipped for any of the reasons listed in
363     // BucketDropReason, many of which make the bucket potentially invalid.
364     bool mCurrentBucketIsSkipped;
365 
366     /** Stores condition correction threshold from the ValueMetric configuration */
367     optional<int64_t> mConditionCorrectionThresholdNs;
368 
isEventLateLocked(const int64_t eventTimeNs)369     inline bool isEventLateLocked(const int64_t eventTimeNs) const {
370         return eventTimeNs < mCurrentBucketStartTimeNs;
371     }
372 
373     // Returns true if any of the intervals have seen new data.
374     // This should return true unless there is an error parsing the value fields from the event.
375     virtual bool aggregateFields(const int64_t eventTimeNs, const MetricDimensionKey& eventKey,
376                                  const LogEvent& event, std::vector<Interval>& intervals,
377                                  DimExtras& dimExtras) = 0;
378 
379     // If this is a pulled metric
isPulled()380     inline bool isPulled() const {
381         return mPullAtomId != -1;
382     }
383 
384 private:
385 };  // ValueMetricProducer
386 
387 }  // namespace statsd
388 }  // namespace os
389 }  // namespace android
390