1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Classes representing the monitoring interface for tasks or devices. 6 7Usage: 8 import argparse 9 from infra_libs import ts_mon 10 11 p = argparse.ArgumentParser() 12 ts_mon.add_argparse_options(p) 13 args = p.parse_args() # Must contain info for Monitor (and optionally Target) 14 ts_mon.process_argparse_options(args) 15 16 # Will use the default Target set up via command line args: 17 m = ts_mon.BooleanMetric('/my/metric/name', fields={'foo': 1, 'bar': 'baz'}) 18 m.set(True) 19 20 # Use a custom Target: 21 t = ts_mon.TaskTarget('service', 'job', 'region', 'host') # or DeviceTarget 22 m2 = ts_mon.GaugeMetric('/my/metric/name2', fields={'asdf': 'qwer'}, target=t) 23 m2.set(5) 24 25Library usage: 26 from infra_libs.ts_mon import CounterMetric 27 # No need to set up Monitor or Target, assume calling code did that. 28 c = CounterMetric('/my/counter', fields={'source': 'mylibrary'}) 29 c.set(0) 30 for x in range(100): 31 c.increment() 32""" 33 34import datetime 35import logging 36import random 37import threading 38import time 39 40from infra_libs.ts_mon.common import errors 41from infra_libs.ts_mon.common import metric_store 42from infra_libs.ts_mon.protos import metrics_pb2 43 44# The maximum number of MetricsData messages to include in each HTTP request. 45# MetricsCollections larger than this will be split into multiple requests. 46METRICS_DATA_LENGTH_LIMIT = 500 47 48 49class State(object): 50 """Package-level state is stored here so that it is easily accessible. 51 52 Configuration is kept in this one object at the global level so that all 53 libraries in use by the same tool or service can all take advantage of the 54 same configuration. 55 """ 56 57 def __init__(self, store_ctor=None, target=None): 58 """Optional arguments are for unit tests.""" 59 if store_ctor is None: # pragma: no branch 60 store_ctor = metric_store.InProcessMetricStore 61 # The Monitor object that will be used to send all metrics. 62 self.global_monitor = None 63 # The Target object that will be paired with all metrics that don't supply 64 # their own. 65 self.target = target 66 # The flush mode being used to control when metrics are pushed. 67 self.flush_mode = None 68 # A predicate to determine if metrics should be sent. 69 self.flush_enabled_fn = lambda: True 70 # The background thread that flushes metrics every 71 # --ts-mon-flush-interval-secs seconds. May be None if 72 # --ts-mon-flush != 'auto' or --ts-mon-flush-interval-secs == 0. 73 self.flush_thread = None 74 # All metrics created by this application. 75 self.metrics = {} 76 # The MetricStore object that holds the actual metric values. 77 self.store = store_ctor(self) 78 # Cached time of the last flush. Useful mostly in AppEngine apps. 79 self.last_flushed = datetime.datetime.utcfromtimestamp(0) 80 # Metric name prefix 81 self.metric_name_prefix = '/chrome/infra/' 82 # Metrics registered with register_global_metrics. Keyed by metric name. 83 self.global_metrics = {} 84 # Callbacks registered with register_global_metrics_callback. Keyed by the 85 # arbitrary string provided by the user. Called before each flush. 86 self.global_metrics_callbacks = {} 87 # Whether to call invoke_global_callbacks() on every flush(). Set to False 88 # on Appengine because it does its own thing. 89 self.invoke_global_callbacks_on_flush = True 90 91 def reset_for_unittest(self): 92 self.metrics = {} 93 self.global_metrics = {} 94 self.global_metrics_callbacks = {} 95 self.invoke_global_callbacks_on_flush = True 96 self.last_flushed = datetime.datetime.utcfromtimestamp(0) 97 self.store.reset_for_unittest() 98 99state = State() 100 101 102def flush(): 103 """Send all metrics that are registered in the application.""" 104 if not state.flush_enabled_fn(): 105 logging.debug('ts_mon: sending metrics is disabled.') 106 return 107 108 if not state.global_monitor: 109 raise errors.MonitoringNoConfiguredMonitorError(None) 110 if not state.target: 111 raise errors.MonitoringNoConfiguredTargetError(None) 112 113 if state.invoke_global_callbacks_on_flush: 114 invoke_global_callbacks() 115 116 rpcs = [] 117 for proto in _generate_proto(): 118 rpcs.append(state.global_monitor.send(proto)) 119 for rpc in rpcs: 120 if rpc is not None: 121 state.global_monitor.wait(rpc) 122 state.last_flushed = datetime.datetime.utcnow() 123 124 125def _generate_proto(): 126 """Generate MetricsPayload for global_monitor.send().""" 127 proto = metrics_pb2.MetricsPayload() 128 129 # Key: Target, value: MetricsCollection. 130 collections = {} 131 132 # Key: (Target, metric name) tuple, value: MetricsDataSet. 133 data_sets = {} 134 135 count = 0 136 for (target, metric, start_time, end_time, fields_values 137 ) in state.store.get_all(): 138 for fields, value in fields_values.items(): 139 if count >= METRICS_DATA_LENGTH_LIMIT: 140 yield proto 141 proto = metrics_pb2.MetricsPayload() 142 collections.clear() 143 data_sets.clear() 144 count = 0 145 146 if target not in collections: 147 collections[target] = proto.metrics_collection.add() 148 target.populate_target_pb(collections[target]) 149 collection = collections[target] 150 151 key = (target, metric.name) 152 new_data_set = None 153 if key not in data_sets: 154 new_data_set = metrics_pb2.MetricsDataSet() 155 metric.populate_data_set(new_data_set) 156 157 data = metrics_pb2.MetricsData() 158 metric.populate_data(data, start_time, end_time, fields, value) 159 160 # All required data protos have been successfully populated. Now we can 161 # insert them in serialized proto and bookeeping data structures. 162 if new_data_set is not None: 163 collection.metrics_data_set.add().CopyFrom(new_data_set) 164 data_sets[key] = collection.metrics_data_set[-1] 165 data_sets[key].data.add().CopyFrom(data) 166 count += 1 167 168 if count > 0: 169 yield proto 170 171 172def register(metric): 173 """Adds the metric to the list of metrics sent by flush(). 174 175 This is called automatically by Metric's constructor. 176 """ 177 # If someone is registering the same metric object twice, that's okay, but 178 # registering two different metric objects with the same metric name is not. 179 for m in state.metrics.values(): 180 if metric == m: 181 state.metrics[metric.name] = metric 182 return 183 if metric.name in state.metrics: 184 raise errors.MonitoringDuplicateRegistrationError(metric.name) 185 186 state.metrics[metric.name] = metric 187 188 189def unregister(metric): 190 """Removes the metric from the list of metrics sent by flush().""" 191 del state.metrics[metric.name] 192 193 194def close(): 195 """Stops any background threads and waits for them to exit.""" 196 if state.flush_thread is not None: 197 state.flush_thread.stop() 198 199 200def reset_for_unittest(disable=False): 201 state.reset_for_unittest() 202 state.flush_enabled_fn = lambda: not disable 203 204 205def register_global_metrics(metrics): 206 """Declare metrics as global. 207 208 Outside Appengine this has no effect. 209 210 On Appengine, registering a metric as "global" simply means it will be reset 211 every time the metric is sent. This allows any instance to send such a metric 212 to a shared stream, e.g. by overriding target fields like task_num (instance 213 ID), host_name (version) or job_name (module name). 214 215 There is no "unregister". Multiple calls add up. It only needs to be called 216 once, similar to gae_ts_mon.initialize(). 217 218 Args: 219 metrics (iterable): a collection of Metric objects. 220 """ 221 state.global_metrics.update({m.name: m for m in metrics}) 222 223 224def register_global_metrics_callback(name, callback): 225 """Register a named function to compute global metrics values. 226 227 There can only be one callback for a given name. Setting another callback with 228 the same name will override the previous one. To disable a callback, set its 229 function to None. 230 231 Args: 232 name (string): name of the callback. 233 callback (function): this function will be called without arguments every 234 minute. On Appengine it is called once for the whole application from the 235 gae_ts_mon cron job. It is intended to set the values of the global 236 metrics. 237 """ 238 if not callback: 239 if name in state.global_metrics_callbacks: 240 del state.global_metrics_callbacks[name] 241 else: 242 state.global_metrics_callbacks[name] = callback 243 244 245def invoke_global_callbacks(): 246 for name, callback in state.global_metrics_callbacks.items(): 247 logging.debug('Invoking callback %s', name) 248 try: 249 callback() 250 except Exception: 251 logging.exception('Monitoring global callback "%s" failed', name) 252 253 254class _FlushThread(threading.Thread): 255 """Background thread that flushes metrics on an interval.""" 256 257 def __init__(self, interval_secs, stop_event=None): 258 super(_FlushThread, self).__init__(name='ts_mon') 259 260 if stop_event is None: 261 stop_event = threading.Event() 262 263 self.daemon = True 264 self.interval_secs = interval_secs 265 self.stop_event = stop_event 266 267 def _flush_and_log_exceptions(self): 268 try: 269 flush() 270 except Exception: 271 logging.exception('Automatic monitoring flush failed.') 272 273 def run(self): 274 # Jitter the first interval so tasks started at the same time (say, by cron) 275 # on different machines don't all send metrics simultaneously. 276 next_timeout = random.uniform(self.interval_secs / 2.0, self.interval_secs) 277 278 while True: 279 if self.stop_event.wait(next_timeout): 280 return 281 282 # Try to flush every N seconds exactly so rate calculations are more 283 # consistent. 284 start = time.time() 285 self._flush_and_log_exceptions() 286 flush_duration = time.time() - start 287 next_timeout = self.interval_secs - flush_duration 288 289 if next_timeout < 0: 290 logging.warning( 291 'Last monitoring flush took %f seconds (longer than ' 292 '--ts-mon-flush-interval-secs = %f seconds)', 293 flush_duration, self.interval_secs) 294 next_timeout = 0 295 296 def stop(self): 297 """Stops the background thread and performs a final flush.""" 298 299 self.stop_event.set() 300 self.join() 301