xref: /aosp_15_r20/external/autotest/utils/frozen_chromite/third_party/infra_libs/ts_mon/common/interface.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Classes representing the monitoring interface for tasks or devices.
6
7Usage:
8  import argparse
9  from infra_libs import ts_mon
10
11  p = argparse.ArgumentParser()
12  ts_mon.add_argparse_options(p)
13  args = p.parse_args()  # Must contain info for Monitor (and optionally Target)
14  ts_mon.process_argparse_options(args)
15
16  # Will use the default Target set up via command line args:
17  m = ts_mon.BooleanMetric('/my/metric/name', fields={'foo': 1, 'bar': 'baz'})
18  m.set(True)
19
20  # Use a custom Target:
21  t = ts_mon.TaskTarget('service', 'job', 'region', 'host')  # or DeviceTarget
22  m2 = ts_mon.GaugeMetric('/my/metric/name2', fields={'asdf': 'qwer'}, target=t)
23  m2.set(5)
24
25Library usage:
26  from infra_libs.ts_mon import CounterMetric
27  # No need to set up Monitor or Target, assume calling code did that.
28  c = CounterMetric('/my/counter', fields={'source': 'mylibrary'})
29  c.set(0)
30  for x in range(100):
31    c.increment()
32"""
33
34import datetime
35import logging
36import random
37import threading
38import time
39
40from infra_libs.ts_mon.common import errors
41from infra_libs.ts_mon.common import metric_store
42from infra_libs.ts_mon.protos import metrics_pb2
43
44# The maximum number of MetricsData messages to include in each HTTP request.
45# MetricsCollections larger than this will be split into multiple requests.
46METRICS_DATA_LENGTH_LIMIT = 500
47
48
49class State(object):
50  """Package-level state is stored here so that it is easily accessible.
51
52  Configuration is kept in this one object at the global level so that all
53  libraries in use by the same tool or service can all take advantage of the
54  same configuration.
55  """
56
57  def __init__(self, store_ctor=None, target=None):
58    """Optional arguments are for unit tests."""
59    if store_ctor is None:  # pragma: no branch
60      store_ctor = metric_store.InProcessMetricStore
61    # The Monitor object that will be used to send all metrics.
62    self.global_monitor = None
63    # The Target object that will be paired with all metrics that don't supply
64    # their own.
65    self.target = target
66    # The flush mode being used to control when metrics are pushed.
67    self.flush_mode = None
68    # A predicate to determine if metrics should be sent.
69    self.flush_enabled_fn = lambda: True
70    # The background thread that flushes metrics every
71    # --ts-mon-flush-interval-secs seconds.  May be None if
72    # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0.
73    self.flush_thread = None
74    # All metrics created by this application.
75    self.metrics = {}
76    # The MetricStore object that holds the actual metric values.
77    self.store = store_ctor(self)
78    # Cached time of the last flush. Useful mostly in AppEngine apps.
79    self.last_flushed = datetime.datetime.utcfromtimestamp(0)
80    # Metric name prefix
81    self.metric_name_prefix = '/chrome/infra/'
82    # Metrics registered with register_global_metrics.  Keyed by metric name.
83    self.global_metrics = {}
84    # Callbacks registered with register_global_metrics_callback.  Keyed by the
85    # arbitrary string provided by the user.  Called before each flush.
86    self.global_metrics_callbacks = {}
87    # Whether to call invoke_global_callbacks() on every flush().  Set to False
88    # on Appengine because it does its own thing.
89    self.invoke_global_callbacks_on_flush = True
90
91  def reset_for_unittest(self):
92    self.metrics = {}
93    self.global_metrics = {}
94    self.global_metrics_callbacks = {}
95    self.invoke_global_callbacks_on_flush = True
96    self.last_flushed = datetime.datetime.utcfromtimestamp(0)
97    self.store.reset_for_unittest()
98
99state = State()
100
101
102def flush():
103  """Send all metrics that are registered in the application."""
104  if not state.flush_enabled_fn():
105    logging.debug('ts_mon: sending metrics is disabled.')
106    return
107
108  if not state.global_monitor:
109    raise errors.MonitoringNoConfiguredMonitorError(None)
110  if not state.target:
111    raise errors.MonitoringNoConfiguredTargetError(None)
112
113  if state.invoke_global_callbacks_on_flush:
114    invoke_global_callbacks()
115
116  rpcs = []
117  for proto in _generate_proto():
118    rpcs.append(state.global_monitor.send(proto))
119  for rpc in rpcs:
120    if rpc is not None:
121      state.global_monitor.wait(rpc)
122  state.last_flushed = datetime.datetime.utcnow()
123
124
125def _generate_proto():
126  """Generate MetricsPayload for global_monitor.send()."""
127  proto = metrics_pb2.MetricsPayload()
128
129  # Key: Target, value: MetricsCollection.
130  collections = {}
131
132  # Key: (Target, metric name) tuple, value: MetricsDataSet.
133  data_sets = {}
134
135  count = 0
136  for (target, metric, start_time, end_time, fields_values
137       ) in state.store.get_all():
138    for fields, value in fields_values.items():
139      if count >= METRICS_DATA_LENGTH_LIMIT:
140        yield proto
141        proto = metrics_pb2.MetricsPayload()
142        collections.clear()
143        data_sets.clear()
144        count = 0
145
146      if target not in collections:
147        collections[target] = proto.metrics_collection.add()
148        target.populate_target_pb(collections[target])
149      collection = collections[target]
150
151      key = (target, metric.name)
152      new_data_set = None
153      if key not in data_sets:
154        new_data_set = metrics_pb2.MetricsDataSet()
155        metric.populate_data_set(new_data_set)
156
157      data = metrics_pb2.MetricsData()
158      metric.populate_data(data, start_time, end_time, fields, value)
159
160      # All required data protos have been successfully populated. Now we can
161      # insert them in serialized proto and bookeeping data structures.
162      if new_data_set is not None:
163        collection.metrics_data_set.add().CopyFrom(new_data_set)
164        data_sets[key] = collection.metrics_data_set[-1]
165      data_sets[key].data.add().CopyFrom(data)
166      count += 1
167
168  if count > 0:
169    yield proto
170
171
172def register(metric):
173  """Adds the metric to the list of metrics sent by flush().
174
175  This is called automatically by Metric's constructor.
176  """
177  # If someone is registering the same metric object twice, that's okay, but
178  # registering two different metric objects with the same metric name is not.
179  for m in state.metrics.values():
180    if metric == m:
181      state.metrics[metric.name] = metric
182      return
183  if metric.name in state.metrics:
184    raise errors.MonitoringDuplicateRegistrationError(metric.name)
185
186  state.metrics[metric.name] = metric
187
188
189def unregister(metric):
190  """Removes the metric from the list of metrics sent by flush()."""
191  del state.metrics[metric.name]
192
193
194def close():
195  """Stops any background threads and waits for them to exit."""
196  if state.flush_thread is not None:
197    state.flush_thread.stop()
198
199
200def reset_for_unittest(disable=False):
201  state.reset_for_unittest()
202  state.flush_enabled_fn = lambda: not disable
203
204
205def register_global_metrics(metrics):
206  """Declare metrics as global.
207
208  Outside Appengine this has no effect.
209
210  On Appengine, registering a metric as "global" simply means it will be reset
211  every time the metric is sent. This allows any instance to send such a metric
212  to a shared stream, e.g. by overriding target fields like task_num (instance
213  ID), host_name (version) or job_name (module name).
214
215  There is no "unregister". Multiple calls add up. It only needs to be called
216  once, similar to gae_ts_mon.initialize().
217
218  Args:
219    metrics (iterable): a collection of Metric objects.
220  """
221  state.global_metrics.update({m.name: m for m in metrics})
222
223
224def register_global_metrics_callback(name, callback):
225  """Register a named function to compute global metrics values.
226
227  There can only be one callback for a given name. Setting another callback with
228  the same name will override the previous one. To disable a callback, set its
229  function to None.
230
231  Args:
232    name (string): name of the callback.
233    callback (function): this function will be called without arguments every
234      minute.  On Appengine it is called once for the whole application from the
235      gae_ts_mon cron job. It is intended to set the values of the global
236      metrics.
237  """
238  if not callback:
239    if name in state.global_metrics_callbacks:
240      del state.global_metrics_callbacks[name]
241  else:
242    state.global_metrics_callbacks[name] = callback
243
244
245def invoke_global_callbacks():
246  for name, callback in state.global_metrics_callbacks.items():
247    logging.debug('Invoking callback %s', name)
248    try:
249      callback()
250    except Exception:
251      logging.exception('Monitoring global callback "%s" failed', name)
252
253
254class _FlushThread(threading.Thread):
255  """Background thread that flushes metrics on an interval."""
256
257  def __init__(self, interval_secs, stop_event=None):
258    super(_FlushThread, self).__init__(name='ts_mon')
259
260    if stop_event is None:
261      stop_event = threading.Event()
262
263    self.daemon = True
264    self.interval_secs = interval_secs
265    self.stop_event = stop_event
266
267  def _flush_and_log_exceptions(self):
268    try:
269      flush()
270    except Exception:
271      logging.exception('Automatic monitoring flush failed.')
272
273  def run(self):
274    # Jitter the first interval so tasks started at the same time (say, by cron)
275    # on different machines don't all send metrics simultaneously.
276    next_timeout = random.uniform(self.interval_secs / 2.0, self.interval_secs)
277
278    while True:
279      if self.stop_event.wait(next_timeout):
280        return
281
282      # Try to flush every N seconds exactly so rate calculations are more
283      # consistent.
284      start = time.time()
285      self._flush_and_log_exceptions()
286      flush_duration = time.time() - start
287      next_timeout = self.interval_secs - flush_duration
288
289      if next_timeout < 0:
290        logging.warning(
291            'Last monitoring flush took %f seconds (longer than '
292            '--ts-mon-flush-interval-secs = %f seconds)',
293            flush_duration, self.interval_secs)
294        next_timeout = 0
295
296  def stop(self):
297    """Stops the background thread and performs a final flush."""
298
299    self.stop_event.set()
300    self.join()
301