1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import collections 6import copy 7import itertools 8import threading 9import time 10 11from infra_libs.ts_mon.common import errors 12 13 14def default_modify_fn(name): 15 def _modify_fn(value, delta): 16 if delta < 0: 17 raise errors.MonitoringDecreasingValueError(name, None, delta) 18 return value + delta 19 return _modify_fn 20 21 22class MetricStore(object): 23 """A place to store values for each metric. 24 25 Several methods take "a normalized field tuple". This is a tuple of 26 (key, value) tuples sorted by key. (The reason this is given as a tuple 27 instead of a dict is because tuples are hashable and can be used as dict keys, 28 dicts can not). 29 30 The MetricStore is also responsible for keeping the start_time of each metric. 31 This is what goes into the start_timestamp_us field in the MetricsData proto 32 for cumulative metrics and distributions, and helps Monarch identify when a 33 counter was reset. This is the MetricStore's job because an implementation 34 might share counter values across multiple instances of a task (like on 35 Appengine), so the start time must be associated with that value so that it 36 can be reset for all tasks at once when the value is reset. 37 38 External metric stores (like those backed by memcache) may be cleared (either 39 wholly or partially) at any time. When this happens the MetricStore *must* 40 generate a new start_time for all the affected metrics. 41 42 Metrics can specify their own explicit start time if they are mirroring the 43 value of some external counter that started counting at a known time. 44 45 Otherwise the MetricStore's time_fn (defaults to time.time()) is called the 46 first time a metric is set or incremented, or after it is cleared externally. 47 """ 48 49 def __init__(self, state, time_fn=None): 50 self._state = state 51 self._time_fn = time_fn or time.time 52 53 def get(self, name, fields, target_fields, default=None): 54 """Fetches the current value for the metric. 55 56 Args: 57 name (string): the metric's name. 58 fields (tuple): a normalized field tuple. 59 target_fields (dict or None): target fields to override. 60 default: the value to return if the metric has no value of this set of 61 field values. 62 """ 63 raise NotImplementedError 64 65 def get_all(self): 66 """Returns an iterator over all the metrics present in the store. 67 68 The iterator yields 5-tuples: 69 (target, metric, start_time, end_time, field_values) 70 """ 71 raise NotImplementedError 72 73 def set(self, name, fields, target_fields, value, enforce_ge=False): 74 """Sets the metric's value. 75 76 Args: 77 name: the metric's name. 78 fields: a normalized field tuple. 79 target_fields (dict or None): target fields to override. 80 value: the new value for the metric. 81 enforce_ge: if this is True, raise an exception if the new value is 82 less than the old value. 83 84 Raises: 85 MonitoringDecreasingValueError: if enforce_ge is True and the new value is 86 smaller than the old value. 87 """ 88 raise NotImplementedError 89 90 def incr(self, name, fields, target_fields, delta, modify_fn=None): 91 """Increments the metric's value. 92 93 Args: 94 name: the metric's name. 95 fields: a normalized field tuple. 96 target_fields (dict or None): target fields to override. 97 delta: how much to increment the value by. 98 modify_fn: this function is called with the original value and the delta 99 as its arguments and is expected to return the new value. The 100 function must be idempotent as it may be called multiple times. 101 """ 102 raise NotImplementedError 103 104 def reset_for_unittest(self, name=None): 105 """Clears the values metrics. Useful in unittests. 106 107 Args: 108 name: the name of an individual metric to reset, or if None resets all 109 metrics. 110 """ 111 raise NotImplementedError 112 113 def _start_time(self, name): 114 if name in self._state.metrics: 115 ret = self._state.metrics[name].start_time 116 if ret is not None: 117 return ret 118 119 return self._time_fn() 120 121 122class _TargetFieldsValues(object): 123 """Holds all values for a single metric. 124 125 Values are keyed by metric fields and target fields (which override the 126 default target fields configured globally for the process). 127 """ 128 129 def __init__(self, start_time): 130 self.start_time = start_time 131 132 # {normalized_target_fields: {normalized_metric_fields: value}} 133 self._values = collections.defaultdict(dict) 134 135 def _get_target_values(self, target_fields): 136 # Normalize the target fields by converting them into a hashable tuple. 137 if not target_fields: 138 target_fields = {} 139 key = tuple(sorted(target_fields.items())) 140 141 return self._values[key] 142 143 def get_value(self, fields, target_fields, default=None): 144 return self._get_target_values(target_fields).get( 145 fields, default) 146 147 def set_value(self, fields, target_fields, value): 148 self._get_target_values(target_fields)[fields] = value 149 150 def iter_targets(self, default_target): 151 for target_fields, fields_values in self._values.items(): 152 if target_fields: 153 target = copy.copy(default_target) 154 target.update({k: v for k, v in target_fields}) 155 else: 156 target = default_target 157 yield target, fields_values 158 159 def __deepcopy__(self, memo_dict): 160 ret = _TargetFieldsValues(self.start_time) 161 ret._values = copy.deepcopy(self._values, memo_dict) 162 return ret 163 164 165class InProcessMetricStore(MetricStore): 166 """A thread-safe metric store that keeps values in memory.""" 167 168 def __init__(self, state, time_fn=None): 169 super(InProcessMetricStore, self).__init__(state, time_fn=time_fn) 170 171 self._values = {} 172 self._thread_lock = threading.Lock() 173 174 def _entry(self, name): 175 if name not in self._values: 176 self._reset(name) 177 178 return self._values[name] 179 180 def get(self, name, fields, target_fields, default=None): 181 return self._entry(name).get_value(fields, target_fields, default) 182 183 def iter_field_values(self, name): 184 return itertools.chain.from_iterable( 185 x.items() for _, x 186 in self._entry(name).iter_targets(self._state.target)) 187 188 def get_all(self): 189 # Make a copy of the metric values in case another thread (or this 190 # generator's consumer) modifies them while we're iterating. 191 with self._thread_lock: 192 values = copy.deepcopy(self._values) 193 end_time = self._time_fn() 194 195 for name, metric_values in values.items(): 196 if name not in self._state.metrics: 197 continue 198 start_time = metric_values.start_time 199 for target, fields_values in metric_values.iter_targets( 200 self._state.target): 201 yield (target, self._state.metrics[name], start_time, end_time, 202 fields_values) 203 204 def set(self, name, fields, target_fields, value, enforce_ge=False): 205 with self._thread_lock: 206 if enforce_ge: 207 old_value = self._entry(name).get_value(fields, target_fields, 0) 208 if value < old_value: 209 raise errors.MonitoringDecreasingValueError(name, old_value, value) 210 211 self._entry(name).set_value(fields, target_fields, value) 212 213 def incr(self, name, fields, target_fields, delta, modify_fn=None): 214 if delta < 0: 215 raise errors.MonitoringDecreasingValueError(name, None, delta) 216 217 if modify_fn is None: 218 modify_fn = default_modify_fn(name) 219 220 with self._thread_lock: 221 self._entry(name).set_value(fields, target_fields, modify_fn( 222 self.get(name, fields, target_fields, 0), delta)) 223 224 def reset_for_unittest(self, name=None): 225 if name is not None: 226 self._reset(name) 227 else: 228 for name in self._values.keys(): 229 self._reset(name) 230 231 def _reset(self, name): 232 self._values[name] = _TargetFieldsValues(self._start_time(name)) 233