xref: /aosp_15_r20/external/autotest/utils/frozen_chromite/third_party/infra_libs/ts_mon/common/metric_store.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections
6import copy
7import itertools
8import threading
9import time
10
11from infra_libs.ts_mon.common import errors
12
13
14def default_modify_fn(name):
15  def _modify_fn(value, delta):
16    if delta < 0:
17      raise errors.MonitoringDecreasingValueError(name, None, delta)
18    return value + delta
19  return _modify_fn
20
21
22class MetricStore(object):
23  """A place to store values for each metric.
24
25  Several methods take "a normalized field tuple".  This is a tuple of
26  (key, value) tuples sorted by key.  (The reason this is given as a tuple
27  instead of a dict is because tuples are hashable and can be used as dict keys,
28  dicts can not).
29
30  The MetricStore is also responsible for keeping the start_time of each metric.
31  This is what goes into the start_timestamp_us field in the MetricsData proto
32  for cumulative metrics and distributions, and helps Monarch identify when a
33  counter was reset.  This is the MetricStore's job because an implementation
34  might share counter values across multiple instances of a task (like on
35  Appengine), so the start time must be associated with that value so that it
36  can be reset for all tasks at once when the value is reset.
37
38  External metric stores (like those backed by memcache) may be cleared (either
39  wholly or partially) at any time.  When this happens the MetricStore *must*
40  generate a new start_time for all the affected metrics.
41
42  Metrics can specify their own explicit start time if they are mirroring the
43  value of some external counter that started counting at a known time.
44
45  Otherwise the MetricStore's time_fn (defaults to time.time()) is called the
46  first time a metric is set or incremented, or after it is cleared externally.
47  """
48
49  def __init__(self, state, time_fn=None):
50    self._state = state
51    self._time_fn = time_fn or time.time
52
53  def get(self, name, fields, target_fields, default=None):
54    """Fetches the current value for the metric.
55
56    Args:
57      name (string): the metric's name.
58      fields (tuple): a normalized field tuple.
59      target_fields (dict or None): target fields to override.
60      default: the value to return if the metric has no value of this set of
61          field values.
62    """
63    raise NotImplementedError
64
65  def get_all(self):
66    """Returns an iterator over all the metrics present in the store.
67
68    The iterator yields 5-tuples:
69      (target, metric, start_time, end_time, field_values)
70    """
71    raise NotImplementedError
72
73  def set(self, name, fields, target_fields, value, enforce_ge=False):
74    """Sets the metric's value.
75
76    Args:
77      name: the metric's name.
78      fields: a normalized field tuple.
79      target_fields (dict or None): target fields to override.
80      value: the new value for the metric.
81      enforce_ge: if this is True, raise an exception if the new value is
82          less than the old value.
83
84    Raises:
85      MonitoringDecreasingValueError: if enforce_ge is True and the new value is
86          smaller than the old value.
87    """
88    raise NotImplementedError
89
90  def incr(self, name, fields, target_fields, delta, modify_fn=None):
91    """Increments the metric's value.
92
93    Args:
94      name: the metric's name.
95      fields: a normalized field tuple.
96      target_fields (dict or None): target fields to override.
97      delta: how much to increment the value by.
98      modify_fn: this function is called with the original value and the delta
99          as its arguments and is expected to return the new value.  The
100          function must be idempotent as it may be called multiple times.
101    """
102    raise NotImplementedError
103
104  def reset_for_unittest(self, name=None):
105    """Clears the values metrics.  Useful in unittests.
106
107    Args:
108      name: the name of an individual metric to reset, or if None resets all
109        metrics.
110    """
111    raise NotImplementedError
112
113  def _start_time(self, name):
114    if name in self._state.metrics:
115      ret = self._state.metrics[name].start_time
116      if ret is not None:
117        return ret
118
119    return self._time_fn()
120
121
122class _TargetFieldsValues(object):
123  """Holds all values for a single metric.
124
125  Values are keyed by metric fields and target fields (which override the
126  default target fields configured globally for the process).
127  """
128
129  def __init__(self, start_time):
130    self.start_time = start_time
131
132    # {normalized_target_fields: {normalized_metric_fields: value}}
133    self._values = collections.defaultdict(dict)
134
135  def _get_target_values(self, target_fields):
136    # Normalize the target fields by converting them into a hashable tuple.
137    if not target_fields:
138      target_fields = {}
139    key = tuple(sorted(target_fields.items()))
140
141    return self._values[key]
142
143  def get_value(self, fields, target_fields, default=None):
144    return self._get_target_values(target_fields).get(
145        fields, default)
146
147  def set_value(self, fields, target_fields, value):
148    self._get_target_values(target_fields)[fields] = value
149
150  def iter_targets(self, default_target):
151    for target_fields, fields_values in self._values.items():
152      if target_fields:
153        target = copy.copy(default_target)
154        target.update({k: v for k, v in target_fields})
155      else:
156        target = default_target
157      yield target, fields_values
158
159  def __deepcopy__(self, memo_dict):
160    ret = _TargetFieldsValues(self.start_time)
161    ret._values = copy.deepcopy(self._values, memo_dict)
162    return ret
163
164
165class InProcessMetricStore(MetricStore):
166  """A thread-safe metric store that keeps values in memory."""
167
168  def __init__(self, state, time_fn=None):
169    super(InProcessMetricStore, self).__init__(state, time_fn=time_fn)
170
171    self._values = {}
172    self._thread_lock = threading.Lock()
173
174  def _entry(self, name):
175    if name not in self._values:
176      self._reset(name)
177
178    return self._values[name]
179
180  def get(self, name, fields, target_fields, default=None):
181    return self._entry(name).get_value(fields, target_fields, default)
182
183  def iter_field_values(self, name):
184    return itertools.chain.from_iterable(
185        x.items() for _, x
186        in self._entry(name).iter_targets(self._state.target))
187
188  def get_all(self):
189    # Make a copy of the metric values in case another thread (or this
190    # generator's consumer) modifies them while we're iterating.
191    with self._thread_lock:
192      values = copy.deepcopy(self._values)
193    end_time = self._time_fn()
194
195    for name, metric_values in values.items():
196      if name not in self._state.metrics:
197        continue
198      start_time = metric_values.start_time
199      for target, fields_values in metric_values.iter_targets(
200          self._state.target):
201        yield (target, self._state.metrics[name], start_time, end_time,
202               fields_values)
203
204  def set(self, name, fields, target_fields, value, enforce_ge=False):
205    with self._thread_lock:
206      if enforce_ge:
207        old_value = self._entry(name).get_value(fields, target_fields, 0)
208        if value < old_value:
209          raise errors.MonitoringDecreasingValueError(name, old_value, value)
210
211      self._entry(name).set_value(fields, target_fields, value)
212
213  def incr(self, name, fields, target_fields, delta, modify_fn=None):
214    if delta < 0:
215      raise errors.MonitoringDecreasingValueError(name, None, delta)
216
217    if modify_fn is None:
218      modify_fn = default_modify_fn(name)
219
220    with self._thread_lock:
221      self._entry(name).set_value(fields, target_fields, modify_fn(
222          self.get(name, fields, target_fields, 0), delta))
223
224  def reset_for_unittest(self, name=None):
225    if name is not None:
226      self._reset(name)
227    else:
228      for name in self._values.keys():
229        self._reset(name)
230
231  def _reset(self, name):
232    self._values[name] = _TargetFieldsValues(self._start_time(name))
233