1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <gtest/gtest_prod.h> 20 21 #include <optional> 22 23 #include "FieldValue.h" 24 #include "HashableDimensionKey.h" 25 #include "MetricProducer.h" 26 #include "anomaly/AnomalyTracker.h" 27 #include "condition/ConditionTracker.h" 28 #include "external/PullDataReceiver.h" 29 #include "external/StatsPullerManager.h" 30 #include "matchers/EventMatcherWizard.h" 31 #include "src/statsd_config.pb.h" 32 #include "stats_log_util.h" 33 #include "stats_util.h" 34 35 namespace android { 36 namespace os { 37 namespace statsd { 38 39 template <typename AggregatedValue> 40 struct PastBucket { 41 int64_t mBucketStartNs; 42 int64_t mBucketEndNs; 43 std::vector<int> aggIndex; 44 std::vector<AggregatedValue> aggregates; 45 std::vector<int> sampleSizes; 46 47 /** 48 * If the metric has no condition, then this field is just wasted. 49 * When we tune statsd memory usage in the future, this is a candidate to optimize. 50 */ 51 int64_t mConditionTrueNs; 52 53 /** 54 * The semantic is the value which needs to be applied to mConditionTrueNs for correction 55 * to be performed prior normalization calculation on the user (read server) side. Applied only 56 * to ValueMetrics with pulled atoms. 57 */ 58 int64_t mConditionCorrectionNs; 59 }; 60 61 // Aggregates values within buckets. 62 // 63 // There are different events that might complete a bucket 64 // - a condition change 65 // - an app upgrade 66 // - an alarm set to the end of the bucket 67 template <typename AggregatedValue, typename DimExtras> 68 class ValueMetricProducer : public MetricProducer, public virtual PullDataReceiver { 69 public: 70 struct PullOptions { 71 const int pullAtomId; 72 const sp<StatsPullerManager>& pullerManager; 73 }; 74 75 struct BucketOptions { 76 const int64_t timeBaseNs; 77 const int64_t startTimeNs; 78 const int64_t bucketSizeNs; 79 const int64_t minBucketSizeNs; 80 const optional<int64_t> conditionCorrectionThresholdNs; 81 const optional<bool> splitBucketForAppUpgrade; 82 }; 83 84 struct WhatOptions { 85 const bool containsAnyPositionInDimensionsInWhat; 86 const bool shouldUseNestedDimensions; 87 const int whatMatcherIndex; 88 const sp<EventMatcherWizard>& matcherWizard; 89 const FieldMatcher& dimensionsInWhat; 90 const vector<Matcher>& fieldMatchers; 91 const vector<ValueMetric::AggregationType> aggregationTypes; 92 const std::vector<std::optional<const BinStarts>> binStartsList; 93 }; 94 95 struct ConditionOptions { 96 const int conditionIndex; 97 const ConditionLinks& conditionLinks; 98 const vector<ConditionState>& initialConditionCache; 99 const sp<ConditionWizard>& conditionWizard; 100 }; 101 102 struct StateOptions { 103 const StateLinks& stateLinks; 104 const vector<int>& slicedStateAtoms; 105 const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap; 106 }; 107 108 struct ActivationOptions { 109 const std::unordered_map<int, std::shared_ptr<Activation>>& eventActivationMap; 110 const std::unordered_map<int, std::vector<std::shared_ptr<Activation>>>& 111 eventDeactivationMap; 112 }; 113 114 struct GuardrailOptions { 115 const size_t dimensionSoftLimit; 116 const size_t dimensionHardLimit; 117 }; 118 119 virtual ~ValueMetricProducer(); 120 121 // Process data pulled on bucket boundary. onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & data,PullResult pullResult,int64_t originalPullTimeNs)122 virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, 123 PullResult pullResult, int64_t originalPullTimeNs) override { 124 } 125 126 // Determine if metric needs to pull isPullNeeded()127 virtual bool isPullNeeded() const override { 128 return false; 129 } 130 131 // ValueMetric needs special logic if it's a pulled atom. 132 void onStatsdInitCompleted(int64_t eventTimeNs) override; 133 134 void onStateChanged(int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey, 135 const FieldValue& oldState, const FieldValue& newState) override; 136 137 protected: 138 ValueMetricProducer(int64_t metricId, const ConfigKey& key, uint64_t protoHash, 139 const PullOptions& pullOptions, const BucketOptions& bucketOptions, 140 const WhatOptions& whatOptions, const ConditionOptions& conditionOptions, 141 const StateOptions& stateOptions, 142 const ActivationOptions& activationOptions, 143 const GuardrailOptions& guardrailOptions, 144 const wp<ConfigMetadataProvider> configMetadataProvider); 145 146 void onMatchedLogEventInternalLocked( 147 const size_t matcherIndex, const MetricDimensionKey& eventKey, 148 const ConditionKey& conditionKey, bool condition, const LogEvent& event, 149 const std::map<int, HashableDimensionKey>& statePrimaryKeys) override; 150 151 // Determine whether or not a LogEvent can be skipped. 152 virtual inline bool canSkipLogEventLocked( 153 const MetricDimensionKey& eventKey, bool condition, int64_t eventTimeNs, 154 const std::map<int, HashableDimensionKey>& statePrimaryKeys) const = 0; 155 156 void notifyAppUpgradeInternalLocked(const int64_t eventTimeNs) override; 157 158 void onDumpReportLocked(const int64_t dumpTimeNs, const bool includeCurrentPartialBucket, 159 const bool eraseData, const DumpLatency dumpLatency, 160 std::set<string>* strSet, std::set<int32_t>& usedUids, 161 android::util::ProtoOutputStream* protoOutput) override; 162 163 struct DumpProtoFields { 164 const int metricTypeFieldId; 165 const int bucketNumFieldId; 166 const int startBucketMsFieldId; 167 const int endBucketMsFieldId; 168 const int conditionTrueNsFieldId; 169 const optional<int> conditionCorrectionNsFieldId; 170 }; 171 172 virtual DumpProtoFields getDumpProtoFields() const = 0; 173 174 void clearPastBucketsLocked(const int64_t dumpTimeNs) override; 175 176 // ValueMetricProducer internal interface to handle active state change. 177 void onActiveStateChangedLocked(const int64_t eventTimeNs, const bool isActive) override; 178 onActiveStateChangedInternalLocked(const int64_t eventTimeNs,const bool isActive)179 virtual void onActiveStateChangedInternalLocked(const int64_t eventTimeNs, 180 const bool isActive) { 181 } 182 183 // ValueMetricProducer internal interface to handle condition change. 184 void onConditionChangedLocked(const bool condition, int64_t eventTimeNs) override; 185 186 // Only called when mIsActive, the event is NOT too late, and after pulling. onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)187 virtual void onConditionChangedInternalLocked(const ConditionState oldCondition, 188 const ConditionState newCondition, 189 const int64_t eventTimeNs) { 190 } 191 192 // Internal interface to handle sliced condition change. 193 void onSlicedConditionMayChangeLocked(bool overallCondition, int64_t eventTime) override; 194 195 void dumpStatesLocked(int out, bool verbose) const override; 196 197 virtual std::string aggregatedValueToString(const AggregatedValue& aggregate) const = 0; 198 199 // For pulled metrics, this method should only be called if a pull has been done. Else we will 200 // not have complete data for the bucket. 201 void flushIfNeededLocked(int64_t eventTime) override; 202 203 // For pulled metrics, this method should only be called if a pulled has been done. Else we will 204 // not have complete data for the bucket. 205 void flushCurrentBucketLocked(int64_t eventTimeNs, int64_t nextBucketStartTimeNs) override; 206 207 void dropDataLocked(const int64_t dropTimeNs) override; 208 209 // Calculate how many buckets are present between the current bucket and eventTimeNs. 210 int64_t calcBucketsForwardCount(const int64_t eventTimeNs) const; 211 212 // Mark the data as invalid. 213 virtual void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); 214 215 // Skips the current bucket without notifying StatsdStats of the skipped bucket. 216 // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that 217 // causes the bucket to be invalidated will not notify StatsdStats. 218 void skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); 219 220 optional<InvalidConfigReason> onConfigUpdatedLocked( 221 const StatsdConfig& config, int configIndex, int metricIndex, 222 const std::vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers, 223 const std::unordered_map<int64_t, int>& oldAtomMatchingTrackerMap, 224 const std::unordered_map<int64_t, int>& newAtomMatchingTrackerMap, 225 const sp<EventMatcherWizard>& matcherWizard, 226 const std::vector<sp<ConditionTracker>>& allConditionTrackers, 227 const std::unordered_map<int64_t, int>& conditionTrackerMap, 228 const sp<ConditionWizard>& wizard, 229 const std::unordered_map<int64_t, int>& metricToActivationMap, 230 std::unordered_map<int, std::vector<int>>& trackerToMetricMap, 231 std::unordered_map<int, std::vector<int>>& conditionToMetricMap, 232 std::unordered_map<int, std::vector<int>>& activationAtomTrackerToMetricMap, 233 std::unordered_map<int, std::vector<int>>& deactivationAtomTrackerToMetricMap, 234 std::vector<int>& metricsWithActivation) override; 235 236 size_t computeValueBucketSizeLocked(const bool isFullBucket, const MetricDimensionKey& dimKey, 237 const bool isFirstBucket, 238 const PastBucket<AggregatedValue>& bucket) const; 239 240 virtual size_t getAggregatedValueSize(const AggregatedValue& value) const = 0; 241 242 virtual optional<int64_t> getConditionIdForMetric(const StatsdConfig& config, 243 const int configIndex) const = 0; 244 245 virtual int64_t getWhatAtomMatcherIdForMetric(const StatsdConfig& config, 246 const int configIndex) const = 0; 247 248 virtual ConditionLinks getConditionLinksForMetric(const StatsdConfig& config, 249 const int configIndex) const = 0; 250 251 int mWhatMatcherIndex; 252 253 sp<EventMatcherWizard> mEventMatcherWizard; 254 255 const sp<StatsPullerManager> mPullerManager; 256 257 // Value fields for matching. 258 const std::vector<Matcher> mFieldMatchers; 259 260 // Value fields for matching. 261 std::set<HashableDimensionKey> mMatchedMetricDimensionKeys; 262 263 // Holds the atom id, primary key pair from a state change. 264 // Only used for pulled metrics. 265 // TODO(b/185796114): can be passed as function arguments instead. 266 pair<int32_t, HashableDimensionKey> mStateChangePrimaryKey; 267 268 // Atom Id for pulled data. -1 if this is not pulled. 269 const int mPullAtomId; 270 271 // Tracks the value information of one value field. 272 struct Interval { 273 // Index in multi value aggregation. 274 int aggIndex; 275 276 // Current aggregation, depending on the aggregation type. 277 AggregatedValue aggregate; 278 279 // Number of samples collected. 280 int sampleSize = 0; 281 hasValueInterval282 inline bool hasValue() const { 283 return sampleSize > 0; 284 } 285 }; 286 287 // Internal state of an ongoing aggregation bucket. 288 struct CurrentBucket { 289 // If the `MetricDimensionKey` state key is the current state key, then 290 // the condition timer will be updated later (e.g. condition/state/active 291 // state change) with the correct condition and time. CurrentBucketCurrentBucket292 CurrentBucket() : intervals(), conditionTimer(ConditionTimer(false, 0)) { 293 } 294 // Value information for each value field of the metric. 295 std::vector<Interval> intervals; 296 // Tracks how long the condition is true. 297 ConditionTimer conditionTimer; 298 }; 299 300 // Tracks the internal state in the ongoing aggregation bucket for each DimensionsInWhat 301 // key and StateValuesKey pair. 302 std::unordered_map<MetricDimensionKey, CurrentBucket> mCurrentSlicedBucket; 303 304 // State key and any extra information for a specific DimensionsInWhat key. 305 struct DimensionsInWhatInfo { DimensionsInWhatInfoDimensionsInWhatInfo306 DimensionsInWhatInfo(const HashableDimensionKey& stateKey) 307 : dimExtras(), currentState(stateKey), hasCurrentState(false) { 308 } 309 310 DimExtras dimExtras; 311 312 // Whether new data is seen in the bucket. 313 // TODO, this could be per base in the dim extras. 314 bool seenNewData = false; 315 316 // Last seen state value(s). 317 HashableDimensionKey currentState; 318 // Whether this dimensions in what key has a current state key. 319 bool hasCurrentState; 320 }; 321 322 // Tracks current state key and other information for each DimensionsInWhat key. 323 std::unordered_map<HashableDimensionKey, DimensionsInWhatInfo> mDimInfos; 324 325 // Save the past buckets and we can clear when the StatsLogReport is dumped. 326 std::unordered_map<MetricDimensionKey, std::vector<PastBucket<AggregatedValue>>> mPastBuckets; 327 328 const int64_t mMinBucketSizeNs; 329 330 // Util function to check whether the specified dimension hits the guardrail. 331 bool hitGuardRailLocked(const MetricDimensionKey& newKey) const; 332 333 bool hasReachedGuardRailLimit() const; 334 pullAndMatchEventsLocked(const int64_t timestampNs)335 virtual void pullAndMatchEventsLocked(const int64_t timestampNs) { 336 } 337 338 virtual bool multipleBucketsSkipped(const int64_t numBucketsForward) const = 0; 339 340 virtual PastBucket<AggregatedValue> buildPartialBucket(int64_t bucketEndTime, 341 std::vector<Interval>& intervals) = 0; 342 343 virtual void closeCurrentBucket(const int64_t eventTimeNs, int64_t nextBucketStartTimeNs); 344 345 virtual void initNextSlicedBucket(int64_t nextBucketStartTimeNs); 346 347 // Updates the condition timers in the current sliced bucket when there is a 348 // condition change or an active state change. 349 void updateCurrentSlicedBucketConditionTimers(bool newCondition, int64_t eventTimeNs); 350 351 virtual void writePastBucketAggregateToProto(const int aggIndex, 352 const AggregatedValue& aggregate, 353 const int sampleSize, 354 ProtoOutputStream* const protoOutput) const = 0; 355 356 static const size_t kBucketSize = sizeof(PastBucket<AggregatedValue>{}); 357 358 const size_t mDimensionSoftLimit; 359 360 const size_t mDimensionHardLimit; 361 362 // This is to track whether or not the bucket is skipped for any of the reasons listed in 363 // BucketDropReason, many of which make the bucket potentially invalid. 364 bool mCurrentBucketIsSkipped; 365 366 /** Stores condition correction threshold from the ValueMetric configuration */ 367 optional<int64_t> mConditionCorrectionThresholdNs; 368 isEventLateLocked(const int64_t eventTimeNs)369 inline bool isEventLateLocked(const int64_t eventTimeNs) const { 370 return eventTimeNs < mCurrentBucketStartTimeNs; 371 } 372 373 // Returns true if any of the intervals have seen new data. 374 // This should return true unless there is an error parsing the value fields from the event. 375 virtual bool aggregateFields(const int64_t eventTimeNs, const MetricDimensionKey& eventKey, 376 const LogEvent& event, std::vector<Interval>& intervals, 377 DimExtras& dimExtras) = 0; 378 379 // If this is a pulled metric isPulled()380 inline bool isPulled() const { 381 return mPullAtomId != -1; 382 } 383 384 private: 385 }; // ValueMetricProducer 386 387 } // namespace statsd 388 } // namespace os 389 } // namespace android 390