1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Classes representing the monitoring interface for tasks or devices.""" 6 7import json 8import logging 9import socket 10 11import httplib2 12 13from googleapiclient import errors 14from infra_libs import httplib2_utils 15from infra_libs.ts_mon.common import pb_to_popo 16try: # pragma: no cover 17 from oauth2client import gce 18except ImportError: # pragma: no cover 19 from oauth2client.contrib import gce 20from oauth2client.client import GoogleCredentials 21from oauth2client.file import Storage 22 23# Special string that can be passed through as the credentials path to use the 24# default Appengine or GCE service account. 25APPENGINE_CREDENTIALS = ':appengine' 26GCE_CREDENTIALS = ':gce' 27 28 29class CredentialFactory(object): 30 """Base class for things that can create OAuth2Credentials.""" 31 32 @classmethod 33 def from_string(cls, path): 34 """Creates an appropriate subclass from a file path or magic string.""" 35 36 if path == APPENGINE_CREDENTIALS: 37 return AppengineCredentials() 38 if path == GCE_CREDENTIALS: 39 return GCECredentials() 40 return FileCredentials(path) 41 42 def create(self, scopes): 43 raise NotImplementedError 44 45 46class GCECredentials(CredentialFactory): 47 def create(self, scopes): 48 return gce.AppAssertionCredentials(scopes) 49 50 51class AppengineCredentials(CredentialFactory): 52 def create(self, scopes): # pragma: no cover 53 # This import doesn't work outside appengine, so delay it until it's used. 54 from oauth2client import appengine 55 return appengine.AppAssertionCredentials(scopes) 56 57 58class FileCredentials(CredentialFactory): 59 def __init__(self, path): 60 self.path = path 61 62 def create(self, scopes): 63 with open(self.path, 'r') as fh: 64 data = json.load(fh) 65 if data.get('type', None): 66 credentials = GoogleCredentials.from_stream(self.path) 67 credentials = credentials.create_scoped(scopes) 68 return credentials 69 return Storage(self.path).get() 70 71 72class DelegateServiceAccountCredentials(CredentialFactory): 73 IAM_SCOPE = 'https://www.googleapis.com/auth/iam' 74 75 def __init__(self, service_account_email, base): 76 self.base = base 77 self.service_account_email = service_account_email 78 79 def create(self, scopes): 80 logging.info('Delegating to service account %s', self.service_account_email) 81 http = httplib2_utils.InstrumentedHttp('actor-credentials') 82 http = self.base.create([self.IAM_SCOPE]).authorize(http) 83 return httplib2_utils.DelegateServiceAccountCredentials( 84 http, self.service_account_email, scopes) 85 86 87class Monitor(object): 88 """Abstract base class encapsulating the ability to collect and send metrics. 89 90 This is a singleton class. There should only be one instance of a Monitor at 91 a time. It will be created and initialized by process_argparse_options. It 92 must exist in order for any metrics to be sent, although both Targets and 93 Metrics may be initialized before the underlying Monitor. If it does not exist 94 at the time that a Metric is sent, an exception will be raised. 95 96 send() can be either synchronous or asynchronous. If synchronous, it needs to 97 make the HTTP request, wait for a response and return None. 98 If asynchronous, send() should start the request and immediately return some 99 object which is later passed to wait() once all requests have been started. 100 """ 101 102 _SCOPES = [] 103 104 def send(self, metric_pb): 105 raise NotImplementedError() 106 107 def wait(self, state): # pragma: no cover 108 pass 109 110 111class HttpsMonitor(Monitor): 112 113 _SCOPES = ['https://www.googleapis.com/auth/prodxmon'] 114 115 def __init__(self, endpoint, credential_factory, http=None, ca_certs=None): 116 self._endpoint = endpoint 117 credentials = credential_factory.create(self._SCOPES) 118 if http is None: 119 http = httplib2_utils.RetriableHttp( 120 httplib2_utils.InstrumentedHttp('acq-mon-api', ca_certs=ca_certs)) 121 self._http = credentials.authorize(http) 122 123 def encode_to_json(self, metric_pb): 124 return json.dumps({'payload': pb_to_popo.convert(metric_pb)}) 125 126 def send(self, metric_pb): 127 body = self.encode_to_json(metric_pb) 128 129 try: 130 resp, content = self._http.request(self._endpoint, 131 method='POST', 132 body=body, 133 headers={'Content-Type': 'application/json'}) 134 if resp.status != 200: 135 logging.warning('HttpsMonitor.send received status %d: %s', resp.status, 136 content) 137 except (ValueError, errors.Error, 138 socket.timeout, socket.error, socket.herror, socket.gaierror, 139 httplib2.HttpLib2Error): 140 logging.exception('HttpsMonitor.send failed') 141 142 143class DebugMonitor(Monitor): 144 """Class which writes metrics to logs or a local file for debugging.""" 145 def __init__(self, filepath=None): 146 if filepath is None: 147 self._fh = None 148 else: 149 self._fh = open(filepath, 'a') 150 151 def send(self, metric_pb): 152 text = str(metric_pb) 153 logging.info('Flushing monitoring metrics:\n%s', text) 154 if self._fh is not None: 155 self._fh.write(text + '\n\n') 156 self._fh.flush() 157 158 159class NullMonitor(Monitor): 160 """Class that doesn't send metrics anywhere.""" 161 def send(self, metric_pb): 162 pass 163