xref: /aosp_15_r20/external/autotest/utils/frozen_chromite/third_party/infra_libs/ts_mon/common/monitors.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Classes representing the monitoring interface for tasks or devices."""
6
7import json
8import logging
9import socket
10
11import httplib2
12
13from googleapiclient import errors
14from infra_libs import httplib2_utils
15from infra_libs.ts_mon.common import pb_to_popo
16try: # pragma: no cover
17  from oauth2client import gce
18except ImportError: # pragma: no cover
19  from oauth2client.contrib import gce
20from oauth2client.client import GoogleCredentials
21from oauth2client.file import Storage
22
23# Special string that can be passed through as the credentials path to use the
24# default Appengine or GCE service account.
25APPENGINE_CREDENTIALS = ':appengine'
26GCE_CREDENTIALS = ':gce'
27
28
29class CredentialFactory(object):
30  """Base class for things that can create OAuth2Credentials."""
31
32  @classmethod
33  def from_string(cls, path):
34    """Creates an appropriate subclass from a file path or magic string."""
35
36    if path == APPENGINE_CREDENTIALS:
37      return AppengineCredentials()
38    if path == GCE_CREDENTIALS:
39      return GCECredentials()
40    return FileCredentials(path)
41
42  def create(self, scopes):
43    raise NotImplementedError
44
45
46class GCECredentials(CredentialFactory):
47  def create(self, scopes):
48    return gce.AppAssertionCredentials(scopes)
49
50
51class AppengineCredentials(CredentialFactory):
52  def create(self, scopes):  # pragma: no cover
53    # This import doesn't work outside appengine, so delay it until it's used.
54    from oauth2client import appengine
55    return appengine.AppAssertionCredentials(scopes)
56
57
58class FileCredentials(CredentialFactory):
59  def __init__(self, path):
60    self.path = path
61
62  def create(self, scopes):
63    with open(self.path, 'r') as fh:
64      data = json.load(fh)
65    if data.get('type', None):
66      credentials = GoogleCredentials.from_stream(self.path)
67      credentials = credentials.create_scoped(scopes)
68      return credentials
69    return Storage(self.path).get()
70
71
72class DelegateServiceAccountCredentials(CredentialFactory):
73  IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
74
75  def __init__(self, service_account_email, base):
76    self.base = base
77    self.service_account_email = service_account_email
78
79  def create(self, scopes):
80    logging.info('Delegating to service account %s', self.service_account_email)
81    http = httplib2_utils.InstrumentedHttp('actor-credentials')
82    http = self.base.create([self.IAM_SCOPE]).authorize(http)
83    return httplib2_utils.DelegateServiceAccountCredentials(
84        http, self.service_account_email, scopes)
85
86
87class Monitor(object):
88  """Abstract base class encapsulating the ability to collect and send metrics.
89
90  This is a singleton class. There should only be one instance of a Monitor at
91  a time. It will be created and initialized by process_argparse_options. It
92  must exist in order for any metrics to be sent, although both Targets and
93  Metrics may be initialized before the underlying Monitor. If it does not exist
94  at the time that a Metric is sent, an exception will be raised.
95
96  send() can be either synchronous or asynchronous.  If synchronous, it needs to
97  make the HTTP request, wait for a response and return None.
98  If asynchronous, send() should start the request and immediately return some
99  object which is later passed to wait() once all requests have been started.
100  """
101
102  _SCOPES = []
103
104  def send(self, metric_pb):
105    raise NotImplementedError()
106
107  def wait(self, state):  # pragma: no cover
108    pass
109
110
111class HttpsMonitor(Monitor):
112
113  _SCOPES = ['https://www.googleapis.com/auth/prodxmon']
114
115  def __init__(self, endpoint, credential_factory, http=None, ca_certs=None):
116    self._endpoint = endpoint
117    credentials = credential_factory.create(self._SCOPES)
118    if http is None:
119      http = httplib2_utils.RetriableHttp(
120          httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs))
121    self._http = credentials.authorize(http)
122
123  def encode_to_json(self, metric_pb):
124    return json.dumps({'payload': pb_to_popo.convert(metric_pb)})
125
126  def send(self, metric_pb):
127    body = self.encode_to_json(metric_pb)
128
129    try:
130      resp, content = self._http.request(self._endpoint,
131          method='POST',
132          body=body,
133          headers={'Content-Type': 'application/json'})
134      if resp.status != 200:
135        logging.warning('HttpsMonitor.send received status %d: %s', resp.status,
136                        content)
137    except (ValueError, errors.Error,
138            socket.timeout, socket.error, socket.herror, socket.gaierror,
139            httplib2.HttpLib2Error):
140      logging.exception('HttpsMonitor.send failed')
141
142
143class DebugMonitor(Monitor):
144  """Class which writes metrics to logs or a local file for debugging."""
145  def __init__(self, filepath=None):
146    if filepath is None:
147      self._fh = None
148    else:
149      self._fh = open(filepath, 'a')
150
151  def send(self, metric_pb):
152    text = str(metric_pb)
153    logging.info('Flushing monitoring metrics:\n%s', text)
154    if self._fh is not None:
155      self._fh.write(text + '\n\n')
156      self._fh.flush()
157
158
159class NullMonitor(Monitor):
160  """Class that doesn't send metrics anywhere."""
161  def send(self, metric_pb):
162    pass
163