1*6dbdd20aSAndroid Build Coastguard Worker# Copyright (C) 2019 The Android Open Source Project 2*6dbdd20aSAndroid Build Coastguard Worker# 3*6dbdd20aSAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License"); 4*6dbdd20aSAndroid Build Coastguard Worker# you may not use this file except in compliance with the License. 5*6dbdd20aSAndroid Build Coastguard Worker# You may obtain a copy of the License at 6*6dbdd20aSAndroid Build Coastguard Worker# 7*6dbdd20aSAndroid Build Coastguard Worker# http://www.apache.org/licenses/LICENSE-2.0 8*6dbdd20aSAndroid Build Coastguard Worker# 9*6dbdd20aSAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software 10*6dbdd20aSAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS, 11*6dbdd20aSAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12*6dbdd20aSAndroid Build Coastguard Worker# See the License for the specific language governing permissions and 13*6dbdd20aSAndroid Build Coastguard Worker# limitations under the License. 14*6dbdd20aSAndroid Build Coastguard Worker 15*6dbdd20aSAndroid Build Coastguard Workerimport asyncio 16*6dbdd20aSAndroid Build Coastguard Workerimport flask 17*6dbdd20aSAndroid Build Coastguard Workerimport logging 18*6dbdd20aSAndroid Build Coastguard Workerimport re 19*6dbdd20aSAndroid Build Coastguard Workerimport urllib.parse 20*6dbdd20aSAndroid Build Coastguard Worker 21*6dbdd20aSAndroid Build Coastguard Workerfrom datetime import datetime, timedelta 22*6dbdd20aSAndroid Build Coastguard Workerfrom common_utils import init_logging, defer, req_async, utc_now_iso, parse_iso_time, SCOPES 23*6dbdd20aSAndroid Build Coastguard Workerfrom config import DB, GERRIT_HOST, GERRIT_PROJECT, PROJECT 24*6dbdd20aSAndroid Build Coastguard Workerfrom config import CI_SITE, GERRIT_VOTING_ENABLED, JOB_CONFIGS, LOGS_TTL_DAYS 25*6dbdd20aSAndroid Build Coastguard Workerfrom config import TRUSTED_EMAILS, GCS_ARTIFACTS, JOB_TIMEOUT_SEC 26*6dbdd20aSAndroid Build Coastguard Workerfrom config import CL_TIMEOUT_SEC 27*6dbdd20aSAndroid Build Coastguard Workerfrom functools import wraps 28*6dbdd20aSAndroid Build Coastguard Workerfrom stackdriver_metrics import STACKDRIVER_METRICS 29*6dbdd20aSAndroid Build Coastguard Worker 30*6dbdd20aSAndroid Build Coastguard WorkerSTACKDRIVER_API = 'https://monitoring.googleapis.com/v3/projects/%s' % PROJECT 31*6dbdd20aSAndroid Build Coastguard Worker 32*6dbdd20aSAndroid Build Coastguard WorkerSCOPES.append('https://www.googleapis.com/auth/firebase.database') 33*6dbdd20aSAndroid Build Coastguard WorkerSCOPES.append('https://www.googleapis.com/auth/userinfo.email') 34*6dbdd20aSAndroid Build Coastguard WorkerSCOPES.append('https://www.googleapis.com/auth/datastore') 35*6dbdd20aSAndroid Build Coastguard WorkerSCOPES.append('https://www.googleapis.com/auth/monitoring') 36*6dbdd20aSAndroid Build Coastguard WorkerSCOPES.append('https://www.googleapis.com/auth/monitoring.write') 37*6dbdd20aSAndroid Build Coastguard Worker 38*6dbdd20aSAndroid Build Coastguard Workerapp = flask.Flask(__name__) 39*6dbdd20aSAndroid Build Coastguard Worker 40*6dbdd20aSAndroid Build Coastguard Workeris_handling_route = {} 41*6dbdd20aSAndroid Build Coastguard Worker 42*6dbdd20aSAndroid Build Coastguard Worker# ------------------------------------------------------------------------------ 43*6dbdd20aSAndroid Build Coastguard Worker# Misc utility functions 44*6dbdd20aSAndroid Build Coastguard Worker# ------------------------------------------------------------------------------ 45*6dbdd20aSAndroid Build Coastguard Worker 46*6dbdd20aSAndroid Build Coastguard Worker 47*6dbdd20aSAndroid Build Coastguard Workerdef is_trusted(email): 48*6dbdd20aSAndroid Build Coastguard Worker return re.match(TRUSTED_EMAILS, email) 49*6dbdd20aSAndroid Build Coastguard Worker 50*6dbdd20aSAndroid Build Coastguard Worker 51*6dbdd20aSAndroid Build Coastguard Workerdef no_concurrency(f): 52*6dbdd20aSAndroid Build Coastguard Worker route_name = f.__name__ 53*6dbdd20aSAndroid Build Coastguard Worker is_handling_route[route_name] = False 54*6dbdd20aSAndroid Build Coastguard Worker 55*6dbdd20aSAndroid Build Coastguard Worker @wraps(f) 56*6dbdd20aSAndroid Build Coastguard Worker async def decorated_function(*args, **kwargs): 57*6dbdd20aSAndroid Build Coastguard Worker if is_handling_route[route_name]: 58*6dbdd20aSAndroid Build Coastguard Worker return flask.abort( 59*6dbdd20aSAndroid Build Coastguard Worker 423, description='Handler %s already running' % route_name) 60*6dbdd20aSAndroid Build Coastguard Worker is_handling_route[route_name] = True 61*6dbdd20aSAndroid Build Coastguard Worker try: 62*6dbdd20aSAndroid Build Coastguard Worker return await f(*args, **kwargs) 63*6dbdd20aSAndroid Build Coastguard Worker finally: 64*6dbdd20aSAndroid Build Coastguard Worker is_handling_route[route_name] = False 65*6dbdd20aSAndroid Build Coastguard Worker 66*6dbdd20aSAndroid Build Coastguard Worker return decorated_function 67*6dbdd20aSAndroid Build Coastguard Worker 68*6dbdd20aSAndroid Build Coastguard Worker 69*6dbdd20aSAndroid Build Coastguard Worker# ------------------------------------------------------------------------------ 70*6dbdd20aSAndroid Build Coastguard Worker# HTTP handlers 71*6dbdd20aSAndroid Build Coastguard Worker# ------------------------------------------------------------------------------ 72*6dbdd20aSAndroid Build Coastguard Worker 73*6dbdd20aSAndroid Build Coastguard Worker 74*6dbdd20aSAndroid Build Coastguard Worker@app.route('/_ah/start', methods=['GET', 'POST']) 75*6dbdd20aSAndroid Build Coastguard Workerasync def http_start(): 76*6dbdd20aSAndroid Build Coastguard Worker init_logging() 77*6dbdd20aSAndroid Build Coastguard Worker await create_stackdriver_metric_definitions() 78*6dbdd20aSAndroid Build Coastguard Worker return 'OK ' + datetime.now().isoformat() 79*6dbdd20aSAndroid Build Coastguard Worker 80*6dbdd20aSAndroid Build Coastguard Worker 81*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/tick', methods=['GET', 'POST']) 82*6dbdd20aSAndroid Build Coastguard Worker@no_concurrency 83*6dbdd20aSAndroid Build Coastguard Workerasync def http_tick(): 84*6dbdd20aSAndroid Build Coastguard Worker # The tick is invoked by cron.yaml every 1 minute, it doesn't allow sub-minute 85*6dbdd20aSAndroid Build Coastguard Worker # jobs. Here we want to poll every 15 seconds to be more responsive. So every 86*6dbdd20aSAndroid Build Coastguard Worker # tick keeps repeating the polling for a minute. 87*6dbdd20aSAndroid Build Coastguard Worker deadline = datetime.now() + timedelta(seconds=55) 88*6dbdd20aSAndroid Build Coastguard Worker while datetime.now() < deadline: 89*6dbdd20aSAndroid Build Coastguard Worker await check_new_cls() 90*6dbdd20aSAndroid Build Coastguard Worker await check_pending_cls() 91*6dbdd20aSAndroid Build Coastguard Worker await update_queue_metrics() 92*6dbdd20aSAndroid Build Coastguard Worker asyncio.sleep(15) 93*6dbdd20aSAndroid Build Coastguard Worker return 'OK ' + datetime.now().isoformat() 94*6dbdd20aSAndroid Build Coastguard Worker 95*6dbdd20aSAndroid Build Coastguard Worker 96*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/queue_postsubmit_jobs', methods=['GET', 'POST']) 97*6dbdd20aSAndroid Build Coastguard Worker@no_concurrency 98*6dbdd20aSAndroid Build Coastguard Workerasync def http_queue_postsubmit_jobs(): 99*6dbdd20aSAndroid Build Coastguard Worker await queue_postsubmit_jobs('main') 100*6dbdd20aSAndroid Build Coastguard Worker return 'OK ' + datetime.now().isoformat() 101*6dbdd20aSAndroid Build Coastguard Worker 102*6dbdd20aSAndroid Build Coastguard Worker 103*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/delete_stale_jobs', methods=['GET', 'POST']) 104*6dbdd20aSAndroid Build Coastguard Worker@no_concurrency 105*6dbdd20aSAndroid Build Coastguard Workerasync def http_delete_stale_jobs(): 106*6dbdd20aSAndroid Build Coastguard Worker await delete_stale_jobs() 107*6dbdd20aSAndroid Build Coastguard Worker return 'OK ' + datetime.now().isoformat() 108*6dbdd20aSAndroid Build Coastguard Worker 109*6dbdd20aSAndroid Build Coastguard Worker 110*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/delete_stale_workers', methods=['GET', 'POST']) 111*6dbdd20aSAndroid Build Coastguard Worker@no_concurrency 112*6dbdd20aSAndroid Build Coastguard Workerasync def http_delete_stale_workers(): 113*6dbdd20aSAndroid Build Coastguard Worker await delete_stale_workers() 114*6dbdd20aSAndroid Build Coastguard Worker return 'OK ' + datetime.now().isoformat() 115*6dbdd20aSAndroid Build Coastguard Worker 116*6dbdd20aSAndroid Build Coastguard Worker 117*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/delete_expired_logs', methods=['GET', 'POST']) 118*6dbdd20aSAndroid Build Coastguard Worker@no_concurrency 119*6dbdd20aSAndroid Build Coastguard Workerasync def http_delete_expired_logs(): 120*6dbdd20aSAndroid Build Coastguard Worker await delete_expired_logs(LOGS_TTL_DAYS) 121*6dbdd20aSAndroid Build Coastguard Worker return 'OK ' + datetime.now().isoformat() 122*6dbdd20aSAndroid Build Coastguard Worker 123*6dbdd20aSAndroid Build Coastguard Worker 124*6dbdd20aSAndroid Build Coastguard Worker# Enddpoints below are only for manual testing & mainteinance. 125*6dbdd20aSAndroid Build Coastguard Worker 126*6dbdd20aSAndroid Build Coastguard Worker 127*6dbdd20aSAndroid Build Coastguard Worker@app.route( 128*6dbdd20aSAndroid Build Coastguard Worker '/controller/delete_expired_logs/<int:ttl_days>', methods=['GET', 'POST']) 129*6dbdd20aSAndroid Build Coastguard Workerasync def http_delete_expired_logs_ttl(ttl_days): 130*6dbdd20aSAndroid Build Coastguard Worker await delete_expired_logs(ttl_days) 131*6dbdd20aSAndroid Build Coastguard Worker return 'OK ' + datetime.now().isoformat() 132*6dbdd20aSAndroid Build Coastguard Worker 133*6dbdd20aSAndroid Build Coastguard Worker 134*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/delete_job_logs/<job_id>', methods=['GET', 'POST']) 135*6dbdd20aSAndroid Build Coastguard Workerasync def http_delete_job_logs(job_id): 136*6dbdd20aSAndroid Build Coastguard Worker await delete_job_logs(job_id) 137*6dbdd20aSAndroid Build Coastguard Worker return 'OK ' + datetime.now().isoformat() 138*6dbdd20aSAndroid Build Coastguard Worker 139*6dbdd20aSAndroid Build Coastguard Worker 140*6dbdd20aSAndroid Build Coastguard Worker# This is to test HTTP timeouts 141*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/sleep/<int:sleep_sec>', methods=['GET', 'POST']) 142*6dbdd20aSAndroid Build Coastguard Workerasync def http_sleep(sleep_sec): 143*6dbdd20aSAndroid Build Coastguard Worker await asyncio.sleep(sleep_sec) 144*6dbdd20aSAndroid Build Coastguard Worker return 'OK ' + datetime.now().isoformat() 145*6dbdd20aSAndroid Build Coastguard Worker 146*6dbdd20aSAndroid Build Coastguard Worker 147*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/sleep_locked/<int:sleep_sec>', methods=['GET', 'POST']) 148*6dbdd20aSAndroid Build Coastguard Worker@no_concurrency 149*6dbdd20aSAndroid Build Coastguard Workerasync def http_sleep_locked(sleep_sec): 150*6dbdd20aSAndroid Build Coastguard Worker await asyncio.sleep(sleep_sec) 151*6dbdd20aSAndroid Build Coastguard Worker return 'OK ' + datetime.now().isoformat() 152*6dbdd20aSAndroid Build Coastguard Worker 153*6dbdd20aSAndroid Build Coastguard Worker 154*6dbdd20aSAndroid Build Coastguard Worker# ------------------------------------------------------------------------------ 155*6dbdd20aSAndroid Build Coastguard Worker# Deferred jobs 156*6dbdd20aSAndroid Build Coastguard Worker# ------------------------------------------------------------------------------ 157*6dbdd20aSAndroid Build Coastguard Worker 158*6dbdd20aSAndroid Build Coastguard Worker 159*6dbdd20aSAndroid Build Coastguard Workerasync def check_new_cls(): 160*6dbdd20aSAndroid Build Coastguard Worker ''' Poll for new CLs and asynchronously enqueue jobs for them.''' 161*6dbdd20aSAndroid Build Coastguard Worker logging.info('Polling for new Gerrit CLs') 162*6dbdd20aSAndroid Build Coastguard Worker date_limit = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d') 163*6dbdd20aSAndroid Build Coastguard Worker url = 'https://%s/a/changes/' % GERRIT_HOST 164*6dbdd20aSAndroid Build Coastguard Worker url += '?o=CURRENT_REVISION&o=DETAILED_ACCOUNTS&o=LABELS&n=32' 165*6dbdd20aSAndroid Build Coastguard Worker url += '&q=branch:main+project:%s' % GERRIT_PROJECT 166*6dbdd20aSAndroid Build Coastguard Worker url += '+is:open+after:%s' % date_limit 167*6dbdd20aSAndroid Build Coastguard Worker resp = await req_async('GET', url, gerrit=True) 168*6dbdd20aSAndroid Build Coastguard Worker tasks = [] 169*6dbdd20aSAndroid Build Coastguard Worker for change in (change for change in resp if 'revisions' in change): 170*6dbdd20aSAndroid Build Coastguard Worker rev_hash = list(change['revisions'].keys())[0] 171*6dbdd20aSAndroid Build Coastguard Worker rev = change['revisions'][rev_hash] 172*6dbdd20aSAndroid Build Coastguard Worker owner = rev['uploader']['email'] 173*6dbdd20aSAndroid Build Coastguard Worker prs_ready = change['labels'].get('Presubmit-Ready', {}).get('approved', {}) 174*6dbdd20aSAndroid Build Coastguard Worker prs_owner = prs_ready.get('email', '') 175*6dbdd20aSAndroid Build Coastguard Worker # Only submit jobs for patchsets that are either uploaded by a trusted 176*6dbdd20aSAndroid Build Coastguard Worker # account or are marked as Presubmit-Verified by a trustd account. 177*6dbdd20aSAndroid Build Coastguard Worker if not is_trusted(owner) and not is_trusted(prs_owner): 178*6dbdd20aSAndroid Build Coastguard Worker continue 179*6dbdd20aSAndroid Build Coastguard Worker tasks.append( 180*6dbdd20aSAndroid Build Coastguard Worker defer( 181*6dbdd20aSAndroid Build Coastguard Worker check_new_cl( 182*6dbdd20aSAndroid Build Coastguard Worker cl=str(change['_number']), 183*6dbdd20aSAndroid Build Coastguard Worker patchset=str(rev['_number']), 184*6dbdd20aSAndroid Build Coastguard Worker change_id=change['id'], 185*6dbdd20aSAndroid Build Coastguard Worker rev_hash=rev_hash, 186*6dbdd20aSAndroid Build Coastguard Worker ref=rev['ref'], 187*6dbdd20aSAndroid Build Coastguard Worker wants_vote=True if prs_ready else False))) 188*6dbdd20aSAndroid Build Coastguard Worker await asyncio.gather(*tasks) 189*6dbdd20aSAndroid Build Coastguard Worker 190*6dbdd20aSAndroid Build Coastguard Worker 191*6dbdd20aSAndroid Build Coastguard Workerdef append_jobs(patch_obj, src, git_ref, now=None): 192*6dbdd20aSAndroid Build Coastguard Worker '''Creates the worker jobs (defined in config.py) for the given CL. 193*6dbdd20aSAndroid Build Coastguard Worker 194*6dbdd20aSAndroid Build Coastguard Worker Jobs are keyed by timestamp-cl-patchset-config to get a fair schedule (workers 195*6dbdd20aSAndroid Build Coastguard Worker pull jobs ordered by the key above). 196*6dbdd20aSAndroid Build Coastguard Worker It dosn't directly write into the DB, it just appends keys to the passed 197*6dbdd20aSAndroid Build Coastguard Worker |patch_obj|, so the whole set of CL descriptor + jobs can be added atomically 198*6dbdd20aSAndroid Build Coastguard Worker to the datastore. 199*6dbdd20aSAndroid Build Coastguard Worker src: is cls/1234/1 (cl and patchset number). 200*6dbdd20aSAndroid Build Coastguard Worker ''' 201*6dbdd20aSAndroid Build Coastguard Worker logging.info('Enqueueing jobs fos cl %s', src) 202*6dbdd20aSAndroid Build Coastguard Worker timestamp = (now or datetime.utcnow()).strftime('%Y%m%d%H%M%S') 203*6dbdd20aSAndroid Build Coastguard Worker for cfg_name, env in JOB_CONFIGS.items(): 204*6dbdd20aSAndroid Build Coastguard Worker job_id = '%s--%s--%s' % (timestamp, src.replace('/', '-'), cfg_name) 205*6dbdd20aSAndroid Build Coastguard Worker logging.info('Enqueueing job %s', job_id) 206*6dbdd20aSAndroid Build Coastguard Worker patch_obj['jobs/' + job_id] = { 207*6dbdd20aSAndroid Build Coastguard Worker 'src': src, 208*6dbdd20aSAndroid Build Coastguard Worker 'type': cfg_name, 209*6dbdd20aSAndroid Build Coastguard Worker 'env': dict(env, PERFETTO_TEST_GIT_REF=git_ref), 210*6dbdd20aSAndroid Build Coastguard Worker 'status': 'QUEUED', 211*6dbdd20aSAndroid Build Coastguard Worker 'time_queued': utc_now_iso(), 212*6dbdd20aSAndroid Build Coastguard Worker } 213*6dbdd20aSAndroid Build Coastguard Worker patch_obj['jobs_queued/' + job_id] = 0 214*6dbdd20aSAndroid Build Coastguard Worker patch_obj[src]['jobs'][job_id] = 0 215*6dbdd20aSAndroid Build Coastguard Worker 216*6dbdd20aSAndroid Build Coastguard Worker 217*6dbdd20aSAndroid Build Coastguard Workerasync def check_new_cl(change_id: str, rev_hash: str, cl: str, patchset: str, 218*6dbdd20aSAndroid Build Coastguard Worker ref: str, wants_vote: bool): 219*6dbdd20aSAndroid Build Coastguard Worker '''Creates the CL + jobs entries in the DB for the given CL if doesn't exist 220*6dbdd20aSAndroid Build Coastguard Worker 221*6dbdd20aSAndroid Build Coastguard Worker If exists check if a Presubmit-Ready label has been added and if so updates it 222*6dbdd20aSAndroid Build Coastguard Worker with the message + vote. 223*6dbdd20aSAndroid Build Coastguard Worker ''' 224*6dbdd20aSAndroid Build Coastguard Worker # We want to do two things here: 225*6dbdd20aSAndroid Build Coastguard Worker # 1) If the CL doesn't exist (hence vote_prop is None) carry on below and 226*6dbdd20aSAndroid Build Coastguard Worker # enqueue jobs for it. 227*6dbdd20aSAndroid Build Coastguard Worker # 2) If the CL exists, we don't need to kick new jobs. However, the user 228*6dbdd20aSAndroid Build Coastguard Worker # might have addeed a Presubmit-Ready label after we created the CL. In 229*6dbdd20aSAndroid Build Coastguard Worker # this case update the |wants_vote| flag and return. 230*6dbdd20aSAndroid Build Coastguard Worker logging.debug('check_new_cl(%s-%s)', cl, patchset) 231*6dbdd20aSAndroid Build Coastguard Worker vote_prop = await req_async( 232*6dbdd20aSAndroid Build Coastguard Worker 'GET', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset)) 233*6dbdd20aSAndroid Build Coastguard Worker if vote_prop is not None: 234*6dbdd20aSAndroid Build Coastguard Worker if vote_prop != wants_vote and wants_vote: 235*6dbdd20aSAndroid Build Coastguard Worker logging.info('Updating wants_vote flag on %s-%s', cl, patchset) 236*6dbdd20aSAndroid Build Coastguard Worker await req_async( 237*6dbdd20aSAndroid Build Coastguard Worker 'PUT', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset), body=True) 238*6dbdd20aSAndroid Build Coastguard Worker # If the label is applied after we have finished running all the jobs just 239*6dbdd20aSAndroid Build Coastguard Worker # jump straight to the voting. 240*6dbdd20aSAndroid Build Coastguard Worker await check_pending_cl(cl_and_ps='%s-%s' % (cl, patchset)) 241*6dbdd20aSAndroid Build Coastguard Worker logging.debug('check_new_cl(%s-%s): already queued', cl, patchset) 242*6dbdd20aSAndroid Build Coastguard Worker return 243*6dbdd20aSAndroid Build Coastguard Worker 244*6dbdd20aSAndroid Build Coastguard Worker # This is the first time we see this patchset, enqueue jobs for it. 245*6dbdd20aSAndroid Build Coastguard Worker 246*6dbdd20aSAndroid Build Coastguard Worker # Dequeue jobs for older patchsets, if any. 247*6dbdd20aSAndroid Build Coastguard Worker await cancel_older_jobs(cl=cl, patchset=patchset) 248*6dbdd20aSAndroid Build Coastguard Worker 249*6dbdd20aSAndroid Build Coastguard Worker src = 'cls/%s-%s' % (cl, patchset) 250*6dbdd20aSAndroid Build Coastguard Worker # Enqueue jobs for the latest patchset. 251*6dbdd20aSAndroid Build Coastguard Worker patch_obj = {} 252*6dbdd20aSAndroid Build Coastguard Worker patch_obj['cls_pending/%s-%s' % (cl, patchset)] = 0 253*6dbdd20aSAndroid Build Coastguard Worker patch_obj[src] = { 254*6dbdd20aSAndroid Build Coastguard Worker 'change_id': change_id, 255*6dbdd20aSAndroid Build Coastguard Worker 'revision_id': rev_hash, 256*6dbdd20aSAndroid Build Coastguard Worker 'time_queued': utc_now_iso(), 257*6dbdd20aSAndroid Build Coastguard Worker 'jobs': {}, 258*6dbdd20aSAndroid Build Coastguard Worker 'wants_vote': wants_vote, 259*6dbdd20aSAndroid Build Coastguard Worker } 260*6dbdd20aSAndroid Build Coastguard Worker append_jobs(patch_obj, src, ref) 261*6dbdd20aSAndroid Build Coastguard Worker logging.debug('check_new_cl(%s-%s): queueing jobs', cl, patchset) 262*6dbdd20aSAndroid Build Coastguard Worker await req_async('PATCH', DB + '.json', body=patch_obj) 263*6dbdd20aSAndroid Build Coastguard Worker 264*6dbdd20aSAndroid Build Coastguard Worker 265*6dbdd20aSAndroid Build Coastguard Workerasync def cancel_older_jobs(cl: str, patchset: str): 266*6dbdd20aSAndroid Build Coastguard Worker first_key = '%s-0' % cl 267*6dbdd20aSAndroid Build Coastguard Worker last_key = '%s-z' % cl 268*6dbdd20aSAndroid Build Coastguard Worker filt = 'orderBy="$key"&startAt="%s"&endAt="%s"' % (first_key, last_key) 269*6dbdd20aSAndroid Build Coastguard Worker cl_objs = await req_async('GET', '%s/cls.json?%s' % (DB, filt)) or {} 270*6dbdd20aSAndroid Build Coastguard Worker tasks = [] 271*6dbdd20aSAndroid Build Coastguard Worker for cl_and_ps, cl_obj in cl_objs.items(): 272*6dbdd20aSAndroid Build Coastguard Worker ps = int(cl_and_ps.split('-')[-1]) 273*6dbdd20aSAndroid Build Coastguard Worker if cl_obj.get('time_ended') or ps >= int(patchset): 274*6dbdd20aSAndroid Build Coastguard Worker continue 275*6dbdd20aSAndroid Build Coastguard Worker logging.info('Cancelling jobs for previous patchset %s', cl_and_ps) 276*6dbdd20aSAndroid Build Coastguard Worker for job_id in cl_obj['jobs'].keys(): 277*6dbdd20aSAndroid Build Coastguard Worker tasks.append(defer(cancel_job(job_id=job_id))) 278*6dbdd20aSAndroid Build Coastguard Worker await asyncio.gather(*tasks) 279*6dbdd20aSAndroid Build Coastguard Worker 280*6dbdd20aSAndroid Build Coastguard Worker 281*6dbdd20aSAndroid Build Coastguard Workerasync def check_pending_cls(): 282*6dbdd20aSAndroid Build Coastguard Worker # Check if any pending CL has completed (all jobs are done). If so publish 283*6dbdd20aSAndroid Build Coastguard Worker # the comment and vote on the CL. 284*6dbdd20aSAndroid Build Coastguard Worker pending_cls = await req_async('GET', '%s/cls_pending.json' % DB) or {} 285*6dbdd20aSAndroid Build Coastguard Worker tasks = [] 286*6dbdd20aSAndroid Build Coastguard Worker for cl_and_ps, _ in pending_cls.items(): 287*6dbdd20aSAndroid Build Coastguard Worker tasks.append(defer(check_pending_cl(cl_and_ps=cl_and_ps))) 288*6dbdd20aSAndroid Build Coastguard Worker await asyncio.gather(*tasks) 289*6dbdd20aSAndroid Build Coastguard Worker 290*6dbdd20aSAndroid Build Coastguard Worker 291*6dbdd20aSAndroid Build Coastguard Workerasync def check_pending_cl(cl_and_ps: str): 292*6dbdd20aSAndroid Build Coastguard Worker # This function can be called twice on the same CL, e.g., in the case when the 293*6dbdd20aSAndroid Build Coastguard Worker # Presubmit-Ready label is applied after we have finished running all the 294*6dbdd20aSAndroid Build Coastguard Worker # jobs (we run presubmit regardless, only the voting is conditioned by PR). 295*6dbdd20aSAndroid Build Coastguard Worker cl_obj = await req_async('GET', '%s/cls/%s.json' % (DB, cl_and_ps)) 296*6dbdd20aSAndroid Build Coastguard Worker all_jobs = cl_obj.get('jobs', {}).keys() 297*6dbdd20aSAndroid Build Coastguard Worker pending_jobs = [] 298*6dbdd20aSAndroid Build Coastguard Worker interrupted_jobs = [] 299*6dbdd20aSAndroid Build Coastguard Worker for job_id in all_jobs: 300*6dbdd20aSAndroid Build Coastguard Worker job_status = await req_async('GET', '%s/jobs/%s/status.json' % (DB, job_id)) 301*6dbdd20aSAndroid Build Coastguard Worker pending_jobs += [job_id] if job_status in ('QUEUED', 'STARTED') else [] 302*6dbdd20aSAndroid Build Coastguard Worker interrupted_jobs += [job_id] if job_status in ('INTERRUPTED') else [] 303*6dbdd20aSAndroid Build Coastguard Worker 304*6dbdd20aSAndroid Build Coastguard Worker # Interrupted jobs are due to VMs being shutdown (usually due to a scale-down) 305*6dbdd20aSAndroid Build Coastguard Worker # Automatically re-queue them so they get picked up by some other vm. 306*6dbdd20aSAndroid Build Coastguard Worker await asyncio.gather(*[requeue_job(job_id) for job_id in interrupted_jobs]) 307*6dbdd20aSAndroid Build Coastguard Worker 308*6dbdd20aSAndroid Build Coastguard Worker if pending_jobs: 309*6dbdd20aSAndroid Build Coastguard Worker # If the CL has been pending for too long cancel all its jobs. Upon the next 310*6dbdd20aSAndroid Build Coastguard Worker # scan it will be deleted and optionally voted on. 311*6dbdd20aSAndroid Build Coastguard Worker t_queued = parse_iso_time(cl_obj['time_queued']) 312*6dbdd20aSAndroid Build Coastguard Worker age_sec = (datetime.utcnow() - t_queued).total_seconds() 313*6dbdd20aSAndroid Build Coastguard Worker if age_sec > CL_TIMEOUT_SEC: 314*6dbdd20aSAndroid Build Coastguard Worker logging.warning('Canceling %s, it has been pending for too long (%s sec)', 315*6dbdd20aSAndroid Build Coastguard Worker cl_and_ps, int(age_sec)) 316*6dbdd20aSAndroid Build Coastguard Worker tasks = [defer(cancel_job(job_id)) for job_id in pending_jobs] 317*6dbdd20aSAndroid Build Coastguard Worker await asyncio.gather(*tasks) 318*6dbdd20aSAndroid Build Coastguard Worker 319*6dbdd20aSAndroid Build Coastguard Worker if pending_jobs or interrupted_jobs: 320*6dbdd20aSAndroid Build Coastguard Worker return 321*6dbdd20aSAndroid Build Coastguard Worker logging.info('All jobs completed for CL %s', cl_and_ps) 322*6dbdd20aSAndroid Build Coastguard Worker 323*6dbdd20aSAndroid Build Coastguard Worker # Remove the CL from the pending queue and update end time. 324*6dbdd20aSAndroid Build Coastguard Worker patch_obj = { 325*6dbdd20aSAndroid Build Coastguard Worker 'cls_pending/%s' % cl_and_ps: {}, # = DELETE 326*6dbdd20aSAndroid Build Coastguard Worker 'cls/%s/time_ended' % cl_and_ps: cl_obj.get('time_ended', utc_now_iso()), 327*6dbdd20aSAndroid Build Coastguard Worker } 328*6dbdd20aSAndroid Build Coastguard Worker await req_async('PATCH', '%s.json' % DB, body=patch_obj) 329*6dbdd20aSAndroid Build Coastguard Worker await update_cl_metrics(src='cls/' + cl_and_ps) 330*6dbdd20aSAndroid Build Coastguard Worker tasks = [defer(update_job_metrics(job_id)) for job_id in all_jobs] 331*6dbdd20aSAndroid Build Coastguard Worker await asyncio.gather(*tasks) 332*6dbdd20aSAndroid Build Coastguard Worker if cl_obj.get('wants_vote'): 333*6dbdd20aSAndroid Build Coastguard Worker await comment_and_vote_cl(cl_and_ps=cl_and_ps) 334*6dbdd20aSAndroid Build Coastguard Worker 335*6dbdd20aSAndroid Build Coastguard Worker 336*6dbdd20aSAndroid Build Coastguard Workerasync def comment_and_vote_cl(cl_and_ps: str): 337*6dbdd20aSAndroid Build Coastguard Worker cl_obj = await req_async('GET', '%s/cls/%s.json' % (DB, cl_and_ps)) 338*6dbdd20aSAndroid Build Coastguard Worker 339*6dbdd20aSAndroid Build Coastguard Worker if cl_obj.get('voted'): 340*6dbdd20aSAndroid Build Coastguard Worker logging.error('Already voted on CL %s', cl_and_ps) 341*6dbdd20aSAndroid Build Coastguard Worker return 342*6dbdd20aSAndroid Build Coastguard Worker 343*6dbdd20aSAndroid Build Coastguard Worker if not cl_obj['wants_vote'] or not GERRIT_VOTING_ENABLED: 344*6dbdd20aSAndroid Build Coastguard Worker logging.info('Skipping voting on CL %s', cl_and_ps) 345*6dbdd20aSAndroid Build Coastguard Worker return 346*6dbdd20aSAndroid Build Coastguard Worker 347*6dbdd20aSAndroid Build Coastguard Worker cl_vote = 1 348*6dbdd20aSAndroid Build Coastguard Worker passed_jobs = [] 349*6dbdd20aSAndroid Build Coastguard Worker failed_jobs = {} 350*6dbdd20aSAndroid Build Coastguard Worker ui_links = [] 351*6dbdd20aSAndroid Build Coastguard Worker cancelled = False 352*6dbdd20aSAndroid Build Coastguard Worker for job_id in cl_obj['jobs'].keys(): 353*6dbdd20aSAndroid Build Coastguard Worker job_obj = await req_async('GET', '%s/jobs/%s.json' % (DB, job_id)) 354*6dbdd20aSAndroid Build Coastguard Worker job_config = JOB_CONFIGS.get(job_obj['type'], {}) 355*6dbdd20aSAndroid Build Coastguard Worker if job_obj['status'] == 'CANCELLED': 356*6dbdd20aSAndroid Build Coastguard Worker cancelled = True 357*6dbdd20aSAndroid Build Coastguard Worker if '-ui-' in job_id: 358*6dbdd20aSAndroid Build Coastguard Worker ui_links.append('https://storage.googleapis.com/%s/%s/ui/index.html' % 359*6dbdd20aSAndroid Build Coastguard Worker (GCS_ARTIFACTS, job_id)) 360*6dbdd20aSAndroid Build Coastguard Worker ui_links.append( 361*6dbdd20aSAndroid Build Coastguard Worker 'https://storage.googleapis.com/%s/%s/ui-test-artifacts/index.html' % 362*6dbdd20aSAndroid Build Coastguard Worker (GCS_ARTIFACTS, job_id)) 363*6dbdd20aSAndroid Build Coastguard Worker if job_obj['status'] == 'COMPLETED': 364*6dbdd20aSAndroid Build Coastguard Worker passed_jobs.append(job_id) 365*6dbdd20aSAndroid Build Coastguard Worker elif not job_config.get('SKIP_VOTING', False): 366*6dbdd20aSAndroid Build Coastguard Worker cl_vote = -1 367*6dbdd20aSAndroid Build Coastguard Worker failed_jobs[job_id] = job_obj['status'] 368*6dbdd20aSAndroid Build Coastguard Worker 369*6dbdd20aSAndroid Build Coastguard Worker msg = '' 370*6dbdd20aSAndroid Build Coastguard Worker if cancelled: 371*6dbdd20aSAndroid Build Coastguard Worker msg += 'Some jobs in this CI run were cancelled. This likely happened ' 372*6dbdd20aSAndroid Build Coastguard Worker msg += 'because a new patchset has been uploaded. Skipping vote.\n' 373*6dbdd20aSAndroid Build Coastguard Worker log_url = CI_SITE + '/#!/logs' 374*6dbdd20aSAndroid Build Coastguard Worker if failed_jobs: 375*6dbdd20aSAndroid Build Coastguard Worker msg += 'FAIL:\n' 376*6dbdd20aSAndroid Build Coastguard Worker msg += ''.join([ 377*6dbdd20aSAndroid Build Coastguard Worker '- %s/%s (%s)\n' % (log_url, job_id, status) 378*6dbdd20aSAndroid Build Coastguard Worker for (job_id, status) in failed_jobs.items() 379*6dbdd20aSAndroid Build Coastguard Worker ]) 380*6dbdd20aSAndroid Build Coastguard Worker if passed_jobs: 381*6dbdd20aSAndroid Build Coastguard Worker msg += '#\nPASS:\n' 382*6dbdd20aSAndroid Build Coastguard Worker msg += ''.join(['- %s/%s\n' % (log_url, job_id) for job_id in passed_jobs]) 383*6dbdd20aSAndroid Build Coastguard Worker if ui_links: 384*6dbdd20aSAndroid Build Coastguard Worker msg += '\nArtifacts:\n' + ''.join('- %s\n' % link for link in ui_links) 385*6dbdd20aSAndroid Build Coastguard Worker msg += 'CI page for this CL:\n' 386*6dbdd20aSAndroid Build Coastguard Worker msg += '- https://ci.perfetto.dev/#!/cls/%s\n' % cl_and_ps.split('-')[0] 387*6dbdd20aSAndroid Build Coastguard Worker body = {'labels': {}, 'message': msg} 388*6dbdd20aSAndroid Build Coastguard Worker if not cancelled: 389*6dbdd20aSAndroid Build Coastguard Worker body['labels']['Code-Review'] = cl_vote 390*6dbdd20aSAndroid Build Coastguard Worker logging.info('Posting results for CL %s', cl_and_ps) 391*6dbdd20aSAndroid Build Coastguard Worker url = 'https://%s/a/changes/%s/revisions/%s/review' % ( 392*6dbdd20aSAndroid Build Coastguard Worker GERRIT_HOST, cl_obj['change_id'], cl_obj['revision_id']) 393*6dbdd20aSAndroid Build Coastguard Worker await req_async('POST', url, body=body, gerrit=True) 394*6dbdd20aSAndroid Build Coastguard Worker await req_async('PUT', '%s/cls/%s/voted.json' % (DB, cl_and_ps), body=True) 395*6dbdd20aSAndroid Build Coastguard Worker 396*6dbdd20aSAndroid Build Coastguard Worker 397*6dbdd20aSAndroid Build Coastguard Workerasync def queue_postsubmit_jobs(branch: str, revision: str = None): 398*6dbdd20aSAndroid Build Coastguard Worker '''Creates the jobs entries in the DB for the given branch or revision 399*6dbdd20aSAndroid Build Coastguard Worker 400*6dbdd20aSAndroid Build Coastguard Worker Can be called in two modes: 401*6dbdd20aSAndroid Build Coastguard Worker 1. ?branch=main: Will retrieve the SHA1 of main and call the one below. 402*6dbdd20aSAndroid Build Coastguard Worker 2. ?branch=main&rev=deadbeef1234: queues jobs for the given revision. 403*6dbdd20aSAndroid Build Coastguard Worker ''' 404*6dbdd20aSAndroid Build Coastguard Worker prj = urllib.parse.quote(GERRIT_PROJECT, '') 405*6dbdd20aSAndroid Build Coastguard Worker assert (branch) 406*6dbdd20aSAndroid Build Coastguard Worker 407*6dbdd20aSAndroid Build Coastguard Worker if not revision: 408*6dbdd20aSAndroid Build Coastguard Worker # Get the commit SHA1 of the head of the branch. 409*6dbdd20aSAndroid Build Coastguard Worker url = 'https://%s/a/projects/%s/branches/%s' % (GERRIT_HOST, prj, branch) 410*6dbdd20aSAndroid Build Coastguard Worker revision = (await req_async('GET', url, gerrit=True))['revision'] 411*6dbdd20aSAndroid Build Coastguard Worker assert (revision) 412*6dbdd20aSAndroid Build Coastguard Worker # If the latest entry matches the revision, quit without queueing another 413*6dbdd20aSAndroid Build Coastguard Worker # set of jobs for the same CL. This is an optimization to avoid wasting 414*6dbdd20aSAndroid Build Coastguard Worker # compute over the weekend to rebuild the same revision every hour. 415*6dbdd20aSAndroid Build Coastguard Worker filt = 'orderBy="$key"&limitToLast=1' 416*6dbdd20aSAndroid Build Coastguard Worker cl_objs = await req_async('GET', '%s/branches.json?%s' % (DB, filt)) or {} 417*6dbdd20aSAndroid Build Coastguard Worker if cl_objs and next(iter(cl_objs.values())).get('rev') == revision: 418*6dbdd20aSAndroid Build Coastguard Worker logging.debug('Skipping postsubmits for %s: already run', revision) 419*6dbdd20aSAndroid Build Coastguard Worker return 420*6dbdd20aSAndroid Build Coastguard Worker await queue_postsubmit_jobs(branch=branch, revision=revision) 421*6dbdd20aSAndroid Build Coastguard Worker return 422*6dbdd20aSAndroid Build Coastguard Worker 423*6dbdd20aSAndroid Build Coastguard Worker # Get the committer datetime for the given revision. 424*6dbdd20aSAndroid Build Coastguard Worker url = 'https://%s/a/projects/%s/commits/%s' % (GERRIT_HOST, prj, revision) 425*6dbdd20aSAndroid Build Coastguard Worker commit_info = await req_async('GET', url, gerrit=True) 426*6dbdd20aSAndroid Build Coastguard Worker time_committed = commit_info['committer']['date'].split('.')[0] 427*6dbdd20aSAndroid Build Coastguard Worker time_committed = datetime.strptime(time_committed, '%Y-%m-%d %H:%M:%S') 428*6dbdd20aSAndroid Build Coastguard Worker 429*6dbdd20aSAndroid Build Coastguard Worker # Enqueue jobs. 430*6dbdd20aSAndroid Build Coastguard Worker src = 'branches/%s-%s' % (branch, time_committed.strftime('%Y%m%d%H%M%S')) 431*6dbdd20aSAndroid Build Coastguard Worker now = datetime.utcnow() 432*6dbdd20aSAndroid Build Coastguard Worker patch_obj = { 433*6dbdd20aSAndroid Build Coastguard Worker src: { 434*6dbdd20aSAndroid Build Coastguard Worker 'rev': revision, 435*6dbdd20aSAndroid Build Coastguard Worker 'subject': commit_info['subject'][:100], 436*6dbdd20aSAndroid Build Coastguard Worker 'author': commit_info['author'].get('email', 'N/A'), 437*6dbdd20aSAndroid Build Coastguard Worker 'time_committed': utc_now_iso(time_committed), 438*6dbdd20aSAndroid Build Coastguard Worker 'time_queued': utc_now_iso(), 439*6dbdd20aSAndroid Build Coastguard Worker 'jobs': {}, 440*6dbdd20aSAndroid Build Coastguard Worker } 441*6dbdd20aSAndroid Build Coastguard Worker } 442*6dbdd20aSAndroid Build Coastguard Worker ref = 'refs/heads/' + branch 443*6dbdd20aSAndroid Build Coastguard Worker append_jobs(patch_obj, src, ref, now) 444*6dbdd20aSAndroid Build Coastguard Worker await req_async('PATCH', DB + '.json', body=patch_obj) 445*6dbdd20aSAndroid Build Coastguard Worker 446*6dbdd20aSAndroid Build Coastguard Worker 447*6dbdd20aSAndroid Build Coastguard Workerasync def delete_expired_logs(ttl_days=LOGS_TTL_DAYS): 448*6dbdd20aSAndroid Build Coastguard Worker url = '%s/logs.json?limitToFirst=1000&shallow=true' % (DB) 449*6dbdd20aSAndroid Build Coastguard Worker logs = await req_async('GET', url) or {} 450*6dbdd20aSAndroid Build Coastguard Worker tasks = [] 451*6dbdd20aSAndroid Build Coastguard Worker logging.debug('delete_expired_logs: got %d keys', len(logs.keys())) 452*6dbdd20aSAndroid Build Coastguard Worker for job_id in logs.keys(): 453*6dbdd20aSAndroid Build Coastguard Worker age_days = (datetime.now() - datetime.strptime(job_id[:8], '%Y%m%d')).days 454*6dbdd20aSAndroid Build Coastguard Worker if age_days > ttl_days: 455*6dbdd20aSAndroid Build Coastguard Worker logging.debug('Delete log %s', job_id) 456*6dbdd20aSAndroid Build Coastguard Worker tasks.append(defer(delete_job_logs(job_id=job_id))) 457*6dbdd20aSAndroid Build Coastguard Worker await asyncio.gather(*tasks) 458*6dbdd20aSAndroid Build Coastguard Worker 459*6dbdd20aSAndroid Build Coastguard Worker 460*6dbdd20aSAndroid Build Coastguard Workerasync def delete_stale_jobs(): 461*6dbdd20aSAndroid Build Coastguard Worker '''Deletes jobs that are left in the running queue for too long 462*6dbdd20aSAndroid Build Coastguard Worker 463*6dbdd20aSAndroid Build Coastguard Worker This is usually due to a crash in the VM that handles them. 464*6dbdd20aSAndroid Build Coastguard Worker ''' 465*6dbdd20aSAndroid Build Coastguard Worker running_jobs = await req_async('GET', '%s/jobs_running.json?shallow=true' % 466*6dbdd20aSAndroid Build Coastguard Worker (DB)) or {} 467*6dbdd20aSAndroid Build Coastguard Worker tasks = [] 468*6dbdd20aSAndroid Build Coastguard Worker for job_id in running_jobs.keys(): 469*6dbdd20aSAndroid Build Coastguard Worker job = await req_async('GET', '%s/jobs/%s.json' % (DB, job_id)) 470*6dbdd20aSAndroid Build Coastguard Worker time_started = parse_iso_time(job.get('time_started', utc_now_iso())) 471*6dbdd20aSAndroid Build Coastguard Worker age = (datetime.now() - time_started).total_seconds() 472*6dbdd20aSAndroid Build Coastguard Worker if age > JOB_TIMEOUT_SEC * 2: 473*6dbdd20aSAndroid Build Coastguard Worker tasks.append(defer(cancel_job(job_id=job_id))) 474*6dbdd20aSAndroid Build Coastguard Worker await asyncio.gather(*tasks) 475*6dbdd20aSAndroid Build Coastguard Worker 476*6dbdd20aSAndroid Build Coastguard Worker 477*6dbdd20aSAndroid Build Coastguard Workerasync def delete_stale_workers(): 478*6dbdd20aSAndroid Build Coastguard Worker '''Deletes workers that have been inactive for too long 479*6dbdd20aSAndroid Build Coastguard Worker 480*6dbdd20aSAndroid Build Coastguard Worker This is usually due to a crash in the VM that handles them. 481*6dbdd20aSAndroid Build Coastguard Worker ''' 482*6dbdd20aSAndroid Build Coastguard Worker workers = await req_async('GET', '%s/workers.json' % (DB)) or {} 483*6dbdd20aSAndroid Build Coastguard Worker patch_obj = {} 484*6dbdd20aSAndroid Build Coastguard Worker for worker_id, worker in workers.items(): 485*6dbdd20aSAndroid Build Coastguard Worker last_update = parse_iso_time(worker.get('last_update', utc_now_iso())) 486*6dbdd20aSAndroid Build Coastguard Worker age = (datetime.now() - last_update).total_seconds() 487*6dbdd20aSAndroid Build Coastguard Worker if age > 60 * 60 * 12: 488*6dbdd20aSAndroid Build Coastguard Worker patch_obj['workers/' + worker_id] = {} # DELETE 489*6dbdd20aSAndroid Build Coastguard Worker if len(patch_obj) == 0: 490*6dbdd20aSAndroid Build Coastguard Worker return 491*6dbdd20aSAndroid Build Coastguard Worker logging.info('Purging %d inactive workers', len(patch_obj)) 492*6dbdd20aSAndroid Build Coastguard Worker await req_async('PATCH', DB + '.json', body=patch_obj) 493*6dbdd20aSAndroid Build Coastguard Worker 494*6dbdd20aSAndroid Build Coastguard Worker 495*6dbdd20aSAndroid Build Coastguard Workerasync def cancel_job(job_id: str): 496*6dbdd20aSAndroid Build Coastguard Worker '''Cancels a job if not completed or failed. 497*6dbdd20aSAndroid Build Coastguard Worker 498*6dbdd20aSAndroid Build Coastguard Worker This function is racy: workers can complete the queued jobs while we mark them 499*6dbdd20aSAndroid Build Coastguard Worker as cancelled. The result of such race is still acceptable.''' 500*6dbdd20aSAndroid Build Coastguard Worker status = await req_async('GET', '%s/jobs/%s/status.json' % (DB, job_id)) 501*6dbdd20aSAndroid Build Coastguard Worker patch_obj = { 502*6dbdd20aSAndroid Build Coastguard Worker 'jobs_running/%s' % job_id: {}, # = DELETE, 503*6dbdd20aSAndroid Build Coastguard Worker 'jobs_queued/%s' % job_id: {}, # = DELETE, 504*6dbdd20aSAndroid Build Coastguard Worker } 505*6dbdd20aSAndroid Build Coastguard Worker if status in ('QUEUED', 'STARTED'): 506*6dbdd20aSAndroid Build Coastguard Worker patch_obj['jobs/%s/status' % job_id] = 'CANCELLED' 507*6dbdd20aSAndroid Build Coastguard Worker patch_obj['jobs/%s/time_ended' % job_id] = utc_now_iso() 508*6dbdd20aSAndroid Build Coastguard Worker await req_async('PATCH', DB + '.json', body=patch_obj) 509*6dbdd20aSAndroid Build Coastguard Worker 510*6dbdd20aSAndroid Build Coastguard Worker 511*6dbdd20aSAndroid Build Coastguard Workerasync def requeue_job(job_id: str): 512*6dbdd20aSAndroid Build Coastguard Worker '''Re-queues a job that was previously interrupted due to a VM shutdown.''' 513*6dbdd20aSAndroid Build Coastguard Worker logging.info('Requeuing interrupted job %s', job_id) 514*6dbdd20aSAndroid Build Coastguard Worker patch_obj = { 515*6dbdd20aSAndroid Build Coastguard Worker 'jobs_running/%s' % job_id: {}, # = DELETE, 516*6dbdd20aSAndroid Build Coastguard Worker 'jobs_queued/%s' % job_id: 0, 517*6dbdd20aSAndroid Build Coastguard Worker 'jobs/%s/status' % job_id: 'QUEUED', 518*6dbdd20aSAndroid Build Coastguard Worker 'jobs/%s/time_queued' % job_id: utc_now_iso(), 519*6dbdd20aSAndroid Build Coastguard Worker 'jobs/%s/time_started' % job_id: {}, # = DELETE 520*6dbdd20aSAndroid Build Coastguard Worker 'jobs/%s/time_ended' % job_id: {}, # = DELETE 521*6dbdd20aSAndroid Build Coastguard Worker 'jobs/%s/worker' % job_id: {}, # = DELETE 522*6dbdd20aSAndroid Build Coastguard Worker } 523*6dbdd20aSAndroid Build Coastguard Worker await req_async('PATCH', DB + '.json', body=patch_obj) 524*6dbdd20aSAndroid Build Coastguard Worker 525*6dbdd20aSAndroid Build Coastguard Worker 526*6dbdd20aSAndroid Build Coastguard Workerasync def delete_job_logs(job_id: str): 527*6dbdd20aSAndroid Build Coastguard Worker await req_async('DELETE', 528*6dbdd20aSAndroid Build Coastguard Worker '%s/logs/%s.json?writeSizeLimit=unlimited' % (DB, job_id)) 529*6dbdd20aSAndroid Build Coastguard Worker 530*6dbdd20aSAndroid Build Coastguard Worker 531*6dbdd20aSAndroid Build Coastguard Workerasync def update_cl_metrics(src: str): 532*6dbdd20aSAndroid Build Coastguard Worker cl_obj = await req_async('GET', '%s/%s.json' % (DB, src)) 533*6dbdd20aSAndroid Build Coastguard Worker t_queued = parse_iso_time(cl_obj['time_queued']) 534*6dbdd20aSAndroid Build Coastguard Worker t_ended = parse_iso_time(cl_obj['time_ended']) 535*6dbdd20aSAndroid Build Coastguard Worker await write_metrics({ 536*6dbdd20aSAndroid Build Coastguard Worker 'ci_cl_completion_time': { 537*6dbdd20aSAndroid Build Coastguard Worker 'l': {}, 538*6dbdd20aSAndroid Build Coastguard Worker 'v': int((t_ended - t_queued).total_seconds()) 539*6dbdd20aSAndroid Build Coastguard Worker } 540*6dbdd20aSAndroid Build Coastguard Worker }) 541*6dbdd20aSAndroid Build Coastguard Worker 542*6dbdd20aSAndroid Build Coastguard Worker 543*6dbdd20aSAndroid Build Coastguard Workerasync def update_job_metrics(job_id: str): 544*6dbdd20aSAndroid Build Coastguard Worker job = await req_async('GET', '%s/jobs/%s.json' % (DB, job_id)) 545*6dbdd20aSAndroid Build Coastguard Worker metrics = {} 546*6dbdd20aSAndroid Build Coastguard Worker if 'time_queued' in job and 'time_started' in job: 547*6dbdd20aSAndroid Build Coastguard Worker t_queued = parse_iso_time(job['time_queued']) 548*6dbdd20aSAndroid Build Coastguard Worker t_started = parse_iso_time(job['time_started']) 549*6dbdd20aSAndroid Build Coastguard Worker metrics['ci_job_queue_time'] = { 550*6dbdd20aSAndroid Build Coastguard Worker 'l': { 551*6dbdd20aSAndroid Build Coastguard Worker 'job_type': job['type'] 552*6dbdd20aSAndroid Build Coastguard Worker }, 553*6dbdd20aSAndroid Build Coastguard Worker 'v': int((t_started - t_queued).total_seconds()) 554*6dbdd20aSAndroid Build Coastguard Worker } 555*6dbdd20aSAndroid Build Coastguard Worker if 'time_ended' in job and 'time_started' in job: 556*6dbdd20aSAndroid Build Coastguard Worker t_started = parse_iso_time(job['time_started']) 557*6dbdd20aSAndroid Build Coastguard Worker t_ended = parse_iso_time(job['time_ended']) 558*6dbdd20aSAndroid Build Coastguard Worker metrics['ci_job_run_time'] = { 559*6dbdd20aSAndroid Build Coastguard Worker 'l': { 560*6dbdd20aSAndroid Build Coastguard Worker 'job_type': job['type'] 561*6dbdd20aSAndroid Build Coastguard Worker }, 562*6dbdd20aSAndroid Build Coastguard Worker 'v': int((t_ended - t_started).total_seconds()) 563*6dbdd20aSAndroid Build Coastguard Worker } 564*6dbdd20aSAndroid Build Coastguard Worker if metrics: 565*6dbdd20aSAndroid Build Coastguard Worker await write_metrics(metrics) 566*6dbdd20aSAndroid Build Coastguard Worker 567*6dbdd20aSAndroid Build Coastguard Worker 568*6dbdd20aSAndroid Build Coastguard Workerasync def update_queue_metrics(): 569*6dbdd20aSAndroid Build Coastguard Worker # Update the stackdriver metric that will drive the autoscaler. 570*6dbdd20aSAndroid Build Coastguard Worker queued = await req_async('GET', DB + '/jobs_queued.json?shallow=true') or {} 571*6dbdd20aSAndroid Build Coastguard Worker running = await req_async('GET', DB + '/jobs_running.json?shallow=true') or {} 572*6dbdd20aSAndroid Build Coastguard Worker logging.debug('ci_job_queue_len: %d + %d', len(queued), len(running)) 573*6dbdd20aSAndroid Build Coastguard Worker await write_metrics({'ci_job_queue_len': {'v': len(queued) + len(running)}}) 574*6dbdd20aSAndroid Build Coastguard Worker 575*6dbdd20aSAndroid Build Coastguard Worker 576*6dbdd20aSAndroid Build Coastguard Workerasync def create_stackdriver_metric_definitions(): 577*6dbdd20aSAndroid Build Coastguard Worker logging.info('Creating Stackdriver metric definitions') 578*6dbdd20aSAndroid Build Coastguard Worker for name, metric in STACKDRIVER_METRICS.items(): 579*6dbdd20aSAndroid Build Coastguard Worker logging.info('Creating metric %s', name) 580*6dbdd20aSAndroid Build Coastguard Worker await req_async('POST', STACKDRIVER_API + '/metricDescriptors', body=metric) 581*6dbdd20aSAndroid Build Coastguard Worker 582*6dbdd20aSAndroid Build Coastguard Worker 583*6dbdd20aSAndroid Build Coastguard Workerasync def write_metrics(metric_dict): 584*6dbdd20aSAndroid Build Coastguard Worker now = utc_now_iso() 585*6dbdd20aSAndroid Build Coastguard Worker desc = {'timeSeries': []} 586*6dbdd20aSAndroid Build Coastguard Worker for key, spec in metric_dict.items(): 587*6dbdd20aSAndroid Build Coastguard Worker desc['timeSeries'] += [{ 588*6dbdd20aSAndroid Build Coastguard Worker 'metric': { 589*6dbdd20aSAndroid Build Coastguard Worker 'type': STACKDRIVER_METRICS[key]['type'], 590*6dbdd20aSAndroid Build Coastguard Worker 'labels': spec.get('l', {}) 591*6dbdd20aSAndroid Build Coastguard Worker }, 592*6dbdd20aSAndroid Build Coastguard Worker 'resource': { 593*6dbdd20aSAndroid Build Coastguard Worker 'type': 'global' 594*6dbdd20aSAndroid Build Coastguard Worker }, 595*6dbdd20aSAndroid Build Coastguard Worker 'points': [{ 596*6dbdd20aSAndroid Build Coastguard Worker 'interval': { 597*6dbdd20aSAndroid Build Coastguard Worker 'endTime': now 598*6dbdd20aSAndroid Build Coastguard Worker }, 599*6dbdd20aSAndroid Build Coastguard Worker 'value': { 600*6dbdd20aSAndroid Build Coastguard Worker 'int64Value': str(spec['v']) 601*6dbdd20aSAndroid Build Coastguard Worker } 602*6dbdd20aSAndroid Build Coastguard Worker }] 603*6dbdd20aSAndroid Build Coastguard Worker }] 604*6dbdd20aSAndroid Build Coastguard Worker try: 605*6dbdd20aSAndroid Build Coastguard Worker await req_async('POST', STACKDRIVER_API + '/timeSeries', body=desc) 606*6dbdd20aSAndroid Build Coastguard Worker except Exception as e: 607*6dbdd20aSAndroid Build Coastguard Worker # Metric updates can easily fail due to Stackdriver API limitations. 608*6dbdd20aSAndroid Build Coastguard Worker msg = str(e) 609*6dbdd20aSAndroid Build Coastguard Worker if 'written more frequently than the maximum sampling' not in msg: 610*6dbdd20aSAndroid Build Coastguard Worker logging.error('Metrics update failed: %s', msg) 611