xref: /aosp_15_r20/external/perfetto/infra/ci/controller/main.py (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1*6dbdd20aSAndroid Build Coastguard Worker# Copyright (C) 2019 The Android Open Source Project
2*6dbdd20aSAndroid Build Coastguard Worker#
3*6dbdd20aSAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License");
4*6dbdd20aSAndroid Build Coastguard Worker# you may not use this file except in compliance with the License.
5*6dbdd20aSAndroid Build Coastguard Worker# You may obtain a copy of the License at
6*6dbdd20aSAndroid Build Coastguard Worker#
7*6dbdd20aSAndroid Build Coastguard Worker#      http://www.apache.org/licenses/LICENSE-2.0
8*6dbdd20aSAndroid Build Coastguard Worker#
9*6dbdd20aSAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software
10*6dbdd20aSAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS,
11*6dbdd20aSAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*6dbdd20aSAndroid Build Coastguard Worker# See the License for the specific language governing permissions and
13*6dbdd20aSAndroid Build Coastguard Worker# limitations under the License.
14*6dbdd20aSAndroid Build Coastguard Worker
15*6dbdd20aSAndroid Build Coastguard Workerimport asyncio
16*6dbdd20aSAndroid Build Coastguard Workerimport flask
17*6dbdd20aSAndroid Build Coastguard Workerimport logging
18*6dbdd20aSAndroid Build Coastguard Workerimport re
19*6dbdd20aSAndroid Build Coastguard Workerimport urllib.parse
20*6dbdd20aSAndroid Build Coastguard Worker
21*6dbdd20aSAndroid Build Coastguard Workerfrom datetime import datetime, timedelta
22*6dbdd20aSAndroid Build Coastguard Workerfrom common_utils import init_logging, defer, req_async, utc_now_iso, parse_iso_time, SCOPES
23*6dbdd20aSAndroid Build Coastguard Workerfrom config import DB, GERRIT_HOST, GERRIT_PROJECT, PROJECT
24*6dbdd20aSAndroid Build Coastguard Workerfrom config import CI_SITE, GERRIT_VOTING_ENABLED, JOB_CONFIGS, LOGS_TTL_DAYS
25*6dbdd20aSAndroid Build Coastguard Workerfrom config import TRUSTED_EMAILS, GCS_ARTIFACTS, JOB_TIMEOUT_SEC
26*6dbdd20aSAndroid Build Coastguard Workerfrom config import CL_TIMEOUT_SEC
27*6dbdd20aSAndroid Build Coastguard Workerfrom functools import wraps
28*6dbdd20aSAndroid Build Coastguard Workerfrom stackdriver_metrics import STACKDRIVER_METRICS
29*6dbdd20aSAndroid Build Coastguard Worker
30*6dbdd20aSAndroid Build Coastguard WorkerSTACKDRIVER_API = 'https://monitoring.googleapis.com/v3/projects/%s' % PROJECT
31*6dbdd20aSAndroid Build Coastguard Worker
32*6dbdd20aSAndroid Build Coastguard WorkerSCOPES.append('https://www.googleapis.com/auth/firebase.database')
33*6dbdd20aSAndroid Build Coastguard WorkerSCOPES.append('https://www.googleapis.com/auth/userinfo.email')
34*6dbdd20aSAndroid Build Coastguard WorkerSCOPES.append('https://www.googleapis.com/auth/datastore')
35*6dbdd20aSAndroid Build Coastguard WorkerSCOPES.append('https://www.googleapis.com/auth/monitoring')
36*6dbdd20aSAndroid Build Coastguard WorkerSCOPES.append('https://www.googleapis.com/auth/monitoring.write')
37*6dbdd20aSAndroid Build Coastguard Worker
38*6dbdd20aSAndroid Build Coastguard Workerapp = flask.Flask(__name__)
39*6dbdd20aSAndroid Build Coastguard Worker
40*6dbdd20aSAndroid Build Coastguard Workeris_handling_route = {}
41*6dbdd20aSAndroid Build Coastguard Worker
42*6dbdd20aSAndroid Build Coastguard Worker# ------------------------------------------------------------------------------
43*6dbdd20aSAndroid Build Coastguard Worker# Misc utility functions
44*6dbdd20aSAndroid Build Coastguard Worker# ------------------------------------------------------------------------------
45*6dbdd20aSAndroid Build Coastguard Worker
46*6dbdd20aSAndroid Build Coastguard Worker
47*6dbdd20aSAndroid Build Coastguard Workerdef is_trusted(email):
48*6dbdd20aSAndroid Build Coastguard Worker  return re.match(TRUSTED_EMAILS, email)
49*6dbdd20aSAndroid Build Coastguard Worker
50*6dbdd20aSAndroid Build Coastguard Worker
51*6dbdd20aSAndroid Build Coastguard Workerdef no_concurrency(f):
52*6dbdd20aSAndroid Build Coastguard Worker  route_name = f.__name__
53*6dbdd20aSAndroid Build Coastguard Worker  is_handling_route[route_name] = False
54*6dbdd20aSAndroid Build Coastguard Worker
55*6dbdd20aSAndroid Build Coastguard Worker  @wraps(f)
56*6dbdd20aSAndroid Build Coastguard Worker  async def decorated_function(*args, **kwargs):
57*6dbdd20aSAndroid Build Coastguard Worker    if is_handling_route[route_name]:
58*6dbdd20aSAndroid Build Coastguard Worker      return flask.abort(
59*6dbdd20aSAndroid Build Coastguard Worker          423, description='Handler %s already running' % route_name)
60*6dbdd20aSAndroid Build Coastguard Worker    is_handling_route[route_name] = True
61*6dbdd20aSAndroid Build Coastguard Worker    try:
62*6dbdd20aSAndroid Build Coastguard Worker      return await f(*args, **kwargs)
63*6dbdd20aSAndroid Build Coastguard Worker    finally:
64*6dbdd20aSAndroid Build Coastguard Worker      is_handling_route[route_name] = False
65*6dbdd20aSAndroid Build Coastguard Worker
66*6dbdd20aSAndroid Build Coastguard Worker  return decorated_function
67*6dbdd20aSAndroid Build Coastguard Worker
68*6dbdd20aSAndroid Build Coastguard Worker
69*6dbdd20aSAndroid Build Coastguard Worker# ------------------------------------------------------------------------------
70*6dbdd20aSAndroid Build Coastguard Worker# HTTP handlers
71*6dbdd20aSAndroid Build Coastguard Worker# ------------------------------------------------------------------------------
72*6dbdd20aSAndroid Build Coastguard Worker
73*6dbdd20aSAndroid Build Coastguard Worker
74*6dbdd20aSAndroid Build Coastguard Worker@app.route('/_ah/start', methods=['GET', 'POST'])
75*6dbdd20aSAndroid Build Coastguard Workerasync def http_start():
76*6dbdd20aSAndroid Build Coastguard Worker  init_logging()
77*6dbdd20aSAndroid Build Coastguard Worker  await create_stackdriver_metric_definitions()
78*6dbdd20aSAndroid Build Coastguard Worker  return 'OK ' + datetime.now().isoformat()
79*6dbdd20aSAndroid Build Coastguard Worker
80*6dbdd20aSAndroid Build Coastguard Worker
81*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/tick', methods=['GET', 'POST'])
82*6dbdd20aSAndroid Build Coastguard Worker@no_concurrency
83*6dbdd20aSAndroid Build Coastguard Workerasync def http_tick():
84*6dbdd20aSAndroid Build Coastguard Worker  # The tick is invoked by cron.yaml every 1 minute, it doesn't allow sub-minute
85*6dbdd20aSAndroid Build Coastguard Worker  # jobs. Here we want to poll every 15 seconds to be more responsive. So every
86*6dbdd20aSAndroid Build Coastguard Worker  # tick keeps repeating the polling for a minute.
87*6dbdd20aSAndroid Build Coastguard Worker  deadline = datetime.now() + timedelta(seconds=55)
88*6dbdd20aSAndroid Build Coastguard Worker  while datetime.now() < deadline:
89*6dbdd20aSAndroid Build Coastguard Worker    await check_new_cls()
90*6dbdd20aSAndroid Build Coastguard Worker    await check_pending_cls()
91*6dbdd20aSAndroid Build Coastguard Worker    await update_queue_metrics()
92*6dbdd20aSAndroid Build Coastguard Worker    asyncio.sleep(15)
93*6dbdd20aSAndroid Build Coastguard Worker  return 'OK ' + datetime.now().isoformat()
94*6dbdd20aSAndroid Build Coastguard Worker
95*6dbdd20aSAndroid Build Coastguard Worker
96*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/queue_postsubmit_jobs', methods=['GET', 'POST'])
97*6dbdd20aSAndroid Build Coastguard Worker@no_concurrency
98*6dbdd20aSAndroid Build Coastguard Workerasync def http_queue_postsubmit_jobs():
99*6dbdd20aSAndroid Build Coastguard Worker  await queue_postsubmit_jobs('main')
100*6dbdd20aSAndroid Build Coastguard Worker  return 'OK ' + datetime.now().isoformat()
101*6dbdd20aSAndroid Build Coastguard Worker
102*6dbdd20aSAndroid Build Coastguard Worker
103*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/delete_stale_jobs', methods=['GET', 'POST'])
104*6dbdd20aSAndroid Build Coastguard Worker@no_concurrency
105*6dbdd20aSAndroid Build Coastguard Workerasync def http_delete_stale_jobs():
106*6dbdd20aSAndroid Build Coastguard Worker  await delete_stale_jobs()
107*6dbdd20aSAndroid Build Coastguard Worker  return 'OK ' + datetime.now().isoformat()
108*6dbdd20aSAndroid Build Coastguard Worker
109*6dbdd20aSAndroid Build Coastguard Worker
110*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/delete_stale_workers', methods=['GET', 'POST'])
111*6dbdd20aSAndroid Build Coastguard Worker@no_concurrency
112*6dbdd20aSAndroid Build Coastguard Workerasync def http_delete_stale_workers():
113*6dbdd20aSAndroid Build Coastguard Worker  await delete_stale_workers()
114*6dbdd20aSAndroid Build Coastguard Worker  return 'OK ' + datetime.now().isoformat()
115*6dbdd20aSAndroid Build Coastguard Worker
116*6dbdd20aSAndroid Build Coastguard Worker
117*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/delete_expired_logs', methods=['GET', 'POST'])
118*6dbdd20aSAndroid Build Coastguard Worker@no_concurrency
119*6dbdd20aSAndroid Build Coastguard Workerasync def http_delete_expired_logs():
120*6dbdd20aSAndroid Build Coastguard Worker  await delete_expired_logs(LOGS_TTL_DAYS)
121*6dbdd20aSAndroid Build Coastguard Worker  return 'OK ' + datetime.now().isoformat()
122*6dbdd20aSAndroid Build Coastguard Worker
123*6dbdd20aSAndroid Build Coastguard Worker
124*6dbdd20aSAndroid Build Coastguard Worker# Enddpoints below are only for manual testing & mainteinance.
125*6dbdd20aSAndroid Build Coastguard Worker
126*6dbdd20aSAndroid Build Coastguard Worker
127*6dbdd20aSAndroid Build Coastguard Worker@app.route(
128*6dbdd20aSAndroid Build Coastguard Worker    '/controller/delete_expired_logs/<int:ttl_days>', methods=['GET', 'POST'])
129*6dbdd20aSAndroid Build Coastguard Workerasync def http_delete_expired_logs_ttl(ttl_days):
130*6dbdd20aSAndroid Build Coastguard Worker  await delete_expired_logs(ttl_days)
131*6dbdd20aSAndroid Build Coastguard Worker  return 'OK ' + datetime.now().isoformat()
132*6dbdd20aSAndroid Build Coastguard Worker
133*6dbdd20aSAndroid Build Coastguard Worker
134*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/delete_job_logs/<job_id>', methods=['GET', 'POST'])
135*6dbdd20aSAndroid Build Coastguard Workerasync def http_delete_job_logs(job_id):
136*6dbdd20aSAndroid Build Coastguard Worker  await delete_job_logs(job_id)
137*6dbdd20aSAndroid Build Coastguard Worker  return 'OK ' + datetime.now().isoformat()
138*6dbdd20aSAndroid Build Coastguard Worker
139*6dbdd20aSAndroid Build Coastguard Worker
140*6dbdd20aSAndroid Build Coastguard Worker# This is to test HTTP timeouts
141*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/sleep/<int:sleep_sec>', methods=['GET', 'POST'])
142*6dbdd20aSAndroid Build Coastguard Workerasync def http_sleep(sleep_sec):
143*6dbdd20aSAndroid Build Coastguard Worker  await asyncio.sleep(sleep_sec)
144*6dbdd20aSAndroid Build Coastguard Worker  return 'OK ' + datetime.now().isoformat()
145*6dbdd20aSAndroid Build Coastguard Worker
146*6dbdd20aSAndroid Build Coastguard Worker
147*6dbdd20aSAndroid Build Coastguard Worker@app.route('/controller/sleep_locked/<int:sleep_sec>', methods=['GET', 'POST'])
148*6dbdd20aSAndroid Build Coastguard Worker@no_concurrency
149*6dbdd20aSAndroid Build Coastguard Workerasync def http_sleep_locked(sleep_sec):
150*6dbdd20aSAndroid Build Coastguard Worker  await asyncio.sleep(sleep_sec)
151*6dbdd20aSAndroid Build Coastguard Worker  return 'OK ' + datetime.now().isoformat()
152*6dbdd20aSAndroid Build Coastguard Worker
153*6dbdd20aSAndroid Build Coastguard Worker
154*6dbdd20aSAndroid Build Coastguard Worker# ------------------------------------------------------------------------------
155*6dbdd20aSAndroid Build Coastguard Worker# Deferred jobs
156*6dbdd20aSAndroid Build Coastguard Worker# ------------------------------------------------------------------------------
157*6dbdd20aSAndroid Build Coastguard Worker
158*6dbdd20aSAndroid Build Coastguard Worker
159*6dbdd20aSAndroid Build Coastguard Workerasync def check_new_cls():
160*6dbdd20aSAndroid Build Coastguard Worker  ''' Poll for new CLs and asynchronously enqueue jobs for them.'''
161*6dbdd20aSAndroid Build Coastguard Worker  logging.info('Polling for new Gerrit CLs')
162*6dbdd20aSAndroid Build Coastguard Worker  date_limit = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d')
163*6dbdd20aSAndroid Build Coastguard Worker  url = 'https://%s/a/changes/' % GERRIT_HOST
164*6dbdd20aSAndroid Build Coastguard Worker  url += '?o=CURRENT_REVISION&o=DETAILED_ACCOUNTS&o=LABELS&n=32'
165*6dbdd20aSAndroid Build Coastguard Worker  url += '&q=branch:main+project:%s' % GERRIT_PROJECT
166*6dbdd20aSAndroid Build Coastguard Worker  url += '+is:open+after:%s' % date_limit
167*6dbdd20aSAndroid Build Coastguard Worker  resp = await req_async('GET', url, gerrit=True)
168*6dbdd20aSAndroid Build Coastguard Worker  tasks = []
169*6dbdd20aSAndroid Build Coastguard Worker  for change in (change for change in resp if 'revisions' in change):
170*6dbdd20aSAndroid Build Coastguard Worker    rev_hash = list(change['revisions'].keys())[0]
171*6dbdd20aSAndroid Build Coastguard Worker    rev = change['revisions'][rev_hash]
172*6dbdd20aSAndroid Build Coastguard Worker    owner = rev['uploader']['email']
173*6dbdd20aSAndroid Build Coastguard Worker    prs_ready = change['labels'].get('Presubmit-Ready', {}).get('approved', {})
174*6dbdd20aSAndroid Build Coastguard Worker    prs_owner = prs_ready.get('email', '')
175*6dbdd20aSAndroid Build Coastguard Worker    # Only submit jobs for patchsets that are either uploaded by a trusted
176*6dbdd20aSAndroid Build Coastguard Worker    # account or are marked as Presubmit-Verified by a trustd account.
177*6dbdd20aSAndroid Build Coastguard Worker    if not is_trusted(owner) and not is_trusted(prs_owner):
178*6dbdd20aSAndroid Build Coastguard Worker      continue
179*6dbdd20aSAndroid Build Coastguard Worker    tasks.append(
180*6dbdd20aSAndroid Build Coastguard Worker        defer(
181*6dbdd20aSAndroid Build Coastguard Worker            check_new_cl(
182*6dbdd20aSAndroid Build Coastguard Worker                cl=str(change['_number']),
183*6dbdd20aSAndroid Build Coastguard Worker                patchset=str(rev['_number']),
184*6dbdd20aSAndroid Build Coastguard Worker                change_id=change['id'],
185*6dbdd20aSAndroid Build Coastguard Worker                rev_hash=rev_hash,
186*6dbdd20aSAndroid Build Coastguard Worker                ref=rev['ref'],
187*6dbdd20aSAndroid Build Coastguard Worker                wants_vote=True if prs_ready else False)))
188*6dbdd20aSAndroid Build Coastguard Worker  await asyncio.gather(*tasks)
189*6dbdd20aSAndroid Build Coastguard Worker
190*6dbdd20aSAndroid Build Coastguard Worker
191*6dbdd20aSAndroid Build Coastguard Workerdef append_jobs(patch_obj, src, git_ref, now=None):
192*6dbdd20aSAndroid Build Coastguard Worker  '''Creates the worker jobs (defined in config.py) for the given CL.
193*6dbdd20aSAndroid Build Coastguard Worker
194*6dbdd20aSAndroid Build Coastguard Worker  Jobs are keyed by timestamp-cl-patchset-config to get a fair schedule (workers
195*6dbdd20aSAndroid Build Coastguard Worker  pull jobs ordered by the key above).
196*6dbdd20aSAndroid Build Coastguard Worker  It dosn't directly write into the DB, it just appends keys to the passed
197*6dbdd20aSAndroid Build Coastguard Worker  |patch_obj|, so the whole set of CL descriptor + jobs can be added atomically
198*6dbdd20aSAndroid Build Coastguard Worker  to the datastore.
199*6dbdd20aSAndroid Build Coastguard Worker  src: is cls/1234/1 (cl and patchset number).
200*6dbdd20aSAndroid Build Coastguard Worker  '''
201*6dbdd20aSAndroid Build Coastguard Worker  logging.info('Enqueueing jobs fos cl %s', src)
202*6dbdd20aSAndroid Build Coastguard Worker  timestamp = (now or datetime.utcnow()).strftime('%Y%m%d%H%M%S')
203*6dbdd20aSAndroid Build Coastguard Worker  for cfg_name, env in JOB_CONFIGS.items():
204*6dbdd20aSAndroid Build Coastguard Worker    job_id = '%s--%s--%s' % (timestamp, src.replace('/', '-'), cfg_name)
205*6dbdd20aSAndroid Build Coastguard Worker    logging.info('Enqueueing job %s', job_id)
206*6dbdd20aSAndroid Build Coastguard Worker    patch_obj['jobs/' + job_id] = {
207*6dbdd20aSAndroid Build Coastguard Worker        'src': src,
208*6dbdd20aSAndroid Build Coastguard Worker        'type': cfg_name,
209*6dbdd20aSAndroid Build Coastguard Worker        'env': dict(env, PERFETTO_TEST_GIT_REF=git_ref),
210*6dbdd20aSAndroid Build Coastguard Worker        'status': 'QUEUED',
211*6dbdd20aSAndroid Build Coastguard Worker        'time_queued': utc_now_iso(),
212*6dbdd20aSAndroid Build Coastguard Worker    }
213*6dbdd20aSAndroid Build Coastguard Worker    patch_obj['jobs_queued/' + job_id] = 0
214*6dbdd20aSAndroid Build Coastguard Worker    patch_obj[src]['jobs'][job_id] = 0
215*6dbdd20aSAndroid Build Coastguard Worker
216*6dbdd20aSAndroid Build Coastguard Worker
217*6dbdd20aSAndroid Build Coastguard Workerasync def check_new_cl(change_id: str, rev_hash: str, cl: str, patchset: str,
218*6dbdd20aSAndroid Build Coastguard Worker                       ref: str, wants_vote: bool):
219*6dbdd20aSAndroid Build Coastguard Worker  '''Creates the CL + jobs entries in the DB for the given CL if doesn't exist
220*6dbdd20aSAndroid Build Coastguard Worker
221*6dbdd20aSAndroid Build Coastguard Worker  If exists check if a Presubmit-Ready label has been added and if so updates it
222*6dbdd20aSAndroid Build Coastguard Worker  with the message + vote.
223*6dbdd20aSAndroid Build Coastguard Worker  '''
224*6dbdd20aSAndroid Build Coastguard Worker  # We want to do two things here:
225*6dbdd20aSAndroid Build Coastguard Worker  # 1) If the CL doesn't exist (hence vote_prop is None) carry on below and
226*6dbdd20aSAndroid Build Coastguard Worker  #    enqueue jobs for it.
227*6dbdd20aSAndroid Build Coastguard Worker  # 2) If the CL exists, we don't need to kick new jobs. However, the user
228*6dbdd20aSAndroid Build Coastguard Worker  #    might have addeed a Presubmit-Ready label after we created the CL. In
229*6dbdd20aSAndroid Build Coastguard Worker  #    this case update the |wants_vote| flag and return.
230*6dbdd20aSAndroid Build Coastguard Worker  logging.debug('check_new_cl(%s-%s)', cl, patchset)
231*6dbdd20aSAndroid Build Coastguard Worker  vote_prop = await req_async(
232*6dbdd20aSAndroid Build Coastguard Worker      'GET', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset))
233*6dbdd20aSAndroid Build Coastguard Worker  if vote_prop is not None:
234*6dbdd20aSAndroid Build Coastguard Worker    if vote_prop != wants_vote and wants_vote:
235*6dbdd20aSAndroid Build Coastguard Worker      logging.info('Updating wants_vote flag on %s-%s', cl, patchset)
236*6dbdd20aSAndroid Build Coastguard Worker      await req_async(
237*6dbdd20aSAndroid Build Coastguard Worker          'PUT', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset), body=True)
238*6dbdd20aSAndroid Build Coastguard Worker      # If the label is applied after we have finished running all the jobs just
239*6dbdd20aSAndroid Build Coastguard Worker      # jump straight to the voting.
240*6dbdd20aSAndroid Build Coastguard Worker      await check_pending_cl(cl_and_ps='%s-%s' % (cl, patchset))
241*6dbdd20aSAndroid Build Coastguard Worker    logging.debug('check_new_cl(%s-%s): already queued', cl, patchset)
242*6dbdd20aSAndroid Build Coastguard Worker    return
243*6dbdd20aSAndroid Build Coastguard Worker
244*6dbdd20aSAndroid Build Coastguard Worker  # This is the first time we see this patchset, enqueue jobs for it.
245*6dbdd20aSAndroid Build Coastguard Worker
246*6dbdd20aSAndroid Build Coastguard Worker  # Dequeue jobs for older patchsets, if any.
247*6dbdd20aSAndroid Build Coastguard Worker  await cancel_older_jobs(cl=cl, patchset=patchset)
248*6dbdd20aSAndroid Build Coastguard Worker
249*6dbdd20aSAndroid Build Coastguard Worker  src = 'cls/%s-%s' % (cl, patchset)
250*6dbdd20aSAndroid Build Coastguard Worker  # Enqueue jobs for the latest patchset.
251*6dbdd20aSAndroid Build Coastguard Worker  patch_obj = {}
252*6dbdd20aSAndroid Build Coastguard Worker  patch_obj['cls_pending/%s-%s' % (cl, patchset)] = 0
253*6dbdd20aSAndroid Build Coastguard Worker  patch_obj[src] = {
254*6dbdd20aSAndroid Build Coastguard Worker      'change_id': change_id,
255*6dbdd20aSAndroid Build Coastguard Worker      'revision_id': rev_hash,
256*6dbdd20aSAndroid Build Coastguard Worker      'time_queued': utc_now_iso(),
257*6dbdd20aSAndroid Build Coastguard Worker      'jobs': {},
258*6dbdd20aSAndroid Build Coastguard Worker      'wants_vote': wants_vote,
259*6dbdd20aSAndroid Build Coastguard Worker  }
260*6dbdd20aSAndroid Build Coastguard Worker  append_jobs(patch_obj, src, ref)
261*6dbdd20aSAndroid Build Coastguard Worker  logging.debug('check_new_cl(%s-%s): queueing jobs', cl, patchset)
262*6dbdd20aSAndroid Build Coastguard Worker  await req_async('PATCH', DB + '.json', body=patch_obj)
263*6dbdd20aSAndroid Build Coastguard Worker
264*6dbdd20aSAndroid Build Coastguard Worker
265*6dbdd20aSAndroid Build Coastguard Workerasync def cancel_older_jobs(cl: str, patchset: str):
266*6dbdd20aSAndroid Build Coastguard Worker  first_key = '%s-0' % cl
267*6dbdd20aSAndroid Build Coastguard Worker  last_key = '%s-z' % cl
268*6dbdd20aSAndroid Build Coastguard Worker  filt = 'orderBy="$key"&startAt="%s"&endAt="%s"' % (first_key, last_key)
269*6dbdd20aSAndroid Build Coastguard Worker  cl_objs = await req_async('GET', '%s/cls.json?%s' % (DB, filt)) or {}
270*6dbdd20aSAndroid Build Coastguard Worker  tasks = []
271*6dbdd20aSAndroid Build Coastguard Worker  for cl_and_ps, cl_obj in cl_objs.items():
272*6dbdd20aSAndroid Build Coastguard Worker    ps = int(cl_and_ps.split('-')[-1])
273*6dbdd20aSAndroid Build Coastguard Worker    if cl_obj.get('time_ended') or ps >= int(patchset):
274*6dbdd20aSAndroid Build Coastguard Worker      continue
275*6dbdd20aSAndroid Build Coastguard Worker    logging.info('Cancelling jobs for previous patchset %s', cl_and_ps)
276*6dbdd20aSAndroid Build Coastguard Worker    for job_id in cl_obj['jobs'].keys():
277*6dbdd20aSAndroid Build Coastguard Worker      tasks.append(defer(cancel_job(job_id=job_id)))
278*6dbdd20aSAndroid Build Coastguard Worker  await asyncio.gather(*tasks)
279*6dbdd20aSAndroid Build Coastguard Worker
280*6dbdd20aSAndroid Build Coastguard Worker
281*6dbdd20aSAndroid Build Coastguard Workerasync def check_pending_cls():
282*6dbdd20aSAndroid Build Coastguard Worker  # Check if any pending CL has completed (all jobs are done). If so publish
283*6dbdd20aSAndroid Build Coastguard Worker  # the comment and vote on the CL.
284*6dbdd20aSAndroid Build Coastguard Worker  pending_cls = await req_async('GET', '%s/cls_pending.json' % DB) or {}
285*6dbdd20aSAndroid Build Coastguard Worker  tasks = []
286*6dbdd20aSAndroid Build Coastguard Worker  for cl_and_ps, _ in pending_cls.items():
287*6dbdd20aSAndroid Build Coastguard Worker    tasks.append(defer(check_pending_cl(cl_and_ps=cl_and_ps)))
288*6dbdd20aSAndroid Build Coastguard Worker  await asyncio.gather(*tasks)
289*6dbdd20aSAndroid Build Coastguard Worker
290*6dbdd20aSAndroid Build Coastguard Worker
291*6dbdd20aSAndroid Build Coastguard Workerasync def check_pending_cl(cl_and_ps: str):
292*6dbdd20aSAndroid Build Coastguard Worker  # This function can be called twice on the same CL, e.g., in the case when the
293*6dbdd20aSAndroid Build Coastguard Worker  # Presubmit-Ready label is applied after we have finished running all the
294*6dbdd20aSAndroid Build Coastguard Worker  # jobs (we run presubmit regardless, only the voting is conditioned by PR).
295*6dbdd20aSAndroid Build Coastguard Worker  cl_obj = await req_async('GET', '%s/cls/%s.json' % (DB, cl_and_ps))
296*6dbdd20aSAndroid Build Coastguard Worker  all_jobs = cl_obj.get('jobs', {}).keys()
297*6dbdd20aSAndroid Build Coastguard Worker  pending_jobs = []
298*6dbdd20aSAndroid Build Coastguard Worker  interrupted_jobs = []
299*6dbdd20aSAndroid Build Coastguard Worker  for job_id in all_jobs:
300*6dbdd20aSAndroid Build Coastguard Worker    job_status = await req_async('GET', '%s/jobs/%s/status.json' % (DB, job_id))
301*6dbdd20aSAndroid Build Coastguard Worker    pending_jobs += [job_id] if job_status in ('QUEUED', 'STARTED') else []
302*6dbdd20aSAndroid Build Coastguard Worker    interrupted_jobs += [job_id] if job_status in ('INTERRUPTED') else []
303*6dbdd20aSAndroid Build Coastguard Worker
304*6dbdd20aSAndroid Build Coastguard Worker  # Interrupted jobs are due to VMs being shutdown (usually due to a scale-down)
305*6dbdd20aSAndroid Build Coastguard Worker  # Automatically re-queue them so they get picked up by some other vm.
306*6dbdd20aSAndroid Build Coastguard Worker  await asyncio.gather(*[requeue_job(job_id) for job_id in interrupted_jobs])
307*6dbdd20aSAndroid Build Coastguard Worker
308*6dbdd20aSAndroid Build Coastguard Worker  if pending_jobs:
309*6dbdd20aSAndroid Build Coastguard Worker    # If the CL has been pending for too long cancel all its jobs. Upon the next
310*6dbdd20aSAndroid Build Coastguard Worker    # scan it will be deleted and optionally voted on.
311*6dbdd20aSAndroid Build Coastguard Worker    t_queued = parse_iso_time(cl_obj['time_queued'])
312*6dbdd20aSAndroid Build Coastguard Worker    age_sec = (datetime.utcnow() - t_queued).total_seconds()
313*6dbdd20aSAndroid Build Coastguard Worker    if age_sec > CL_TIMEOUT_SEC:
314*6dbdd20aSAndroid Build Coastguard Worker      logging.warning('Canceling %s, it has been pending for too long (%s sec)',
315*6dbdd20aSAndroid Build Coastguard Worker                      cl_and_ps, int(age_sec))
316*6dbdd20aSAndroid Build Coastguard Worker      tasks = [defer(cancel_job(job_id)) for job_id in pending_jobs]
317*6dbdd20aSAndroid Build Coastguard Worker      await asyncio.gather(*tasks)
318*6dbdd20aSAndroid Build Coastguard Worker
319*6dbdd20aSAndroid Build Coastguard Worker  if pending_jobs or interrupted_jobs:
320*6dbdd20aSAndroid Build Coastguard Worker    return
321*6dbdd20aSAndroid Build Coastguard Worker  logging.info('All jobs completed for CL %s', cl_and_ps)
322*6dbdd20aSAndroid Build Coastguard Worker
323*6dbdd20aSAndroid Build Coastguard Worker  # Remove the CL from the pending queue and update end time.
324*6dbdd20aSAndroid Build Coastguard Worker  patch_obj = {
325*6dbdd20aSAndroid Build Coastguard Worker      'cls_pending/%s' % cl_and_ps: {},  # = DELETE
326*6dbdd20aSAndroid Build Coastguard Worker      'cls/%s/time_ended' % cl_and_ps: cl_obj.get('time_ended', utc_now_iso()),
327*6dbdd20aSAndroid Build Coastguard Worker  }
328*6dbdd20aSAndroid Build Coastguard Worker  await req_async('PATCH', '%s.json' % DB, body=patch_obj)
329*6dbdd20aSAndroid Build Coastguard Worker  await update_cl_metrics(src='cls/' + cl_and_ps)
330*6dbdd20aSAndroid Build Coastguard Worker  tasks = [defer(update_job_metrics(job_id)) for job_id in all_jobs]
331*6dbdd20aSAndroid Build Coastguard Worker  await asyncio.gather(*tasks)
332*6dbdd20aSAndroid Build Coastguard Worker  if cl_obj.get('wants_vote'):
333*6dbdd20aSAndroid Build Coastguard Worker    await comment_and_vote_cl(cl_and_ps=cl_and_ps)
334*6dbdd20aSAndroid Build Coastguard Worker
335*6dbdd20aSAndroid Build Coastguard Worker
336*6dbdd20aSAndroid Build Coastguard Workerasync def comment_and_vote_cl(cl_and_ps: str):
337*6dbdd20aSAndroid Build Coastguard Worker  cl_obj = await req_async('GET', '%s/cls/%s.json' % (DB, cl_and_ps))
338*6dbdd20aSAndroid Build Coastguard Worker
339*6dbdd20aSAndroid Build Coastguard Worker  if cl_obj.get('voted'):
340*6dbdd20aSAndroid Build Coastguard Worker    logging.error('Already voted on CL %s', cl_and_ps)
341*6dbdd20aSAndroid Build Coastguard Worker    return
342*6dbdd20aSAndroid Build Coastguard Worker
343*6dbdd20aSAndroid Build Coastguard Worker  if not cl_obj['wants_vote'] or not GERRIT_VOTING_ENABLED:
344*6dbdd20aSAndroid Build Coastguard Worker    logging.info('Skipping voting on CL %s', cl_and_ps)
345*6dbdd20aSAndroid Build Coastguard Worker    return
346*6dbdd20aSAndroid Build Coastguard Worker
347*6dbdd20aSAndroid Build Coastguard Worker  cl_vote = 1
348*6dbdd20aSAndroid Build Coastguard Worker  passed_jobs = []
349*6dbdd20aSAndroid Build Coastguard Worker  failed_jobs = {}
350*6dbdd20aSAndroid Build Coastguard Worker  ui_links = []
351*6dbdd20aSAndroid Build Coastguard Worker  cancelled = False
352*6dbdd20aSAndroid Build Coastguard Worker  for job_id in cl_obj['jobs'].keys():
353*6dbdd20aSAndroid Build Coastguard Worker    job_obj = await req_async('GET', '%s/jobs/%s.json' % (DB, job_id))
354*6dbdd20aSAndroid Build Coastguard Worker    job_config = JOB_CONFIGS.get(job_obj['type'], {})
355*6dbdd20aSAndroid Build Coastguard Worker    if job_obj['status'] == 'CANCELLED':
356*6dbdd20aSAndroid Build Coastguard Worker      cancelled = True
357*6dbdd20aSAndroid Build Coastguard Worker    if '-ui-' in job_id:
358*6dbdd20aSAndroid Build Coastguard Worker      ui_links.append('https://storage.googleapis.com/%s/%s/ui/index.html' %
359*6dbdd20aSAndroid Build Coastguard Worker                      (GCS_ARTIFACTS, job_id))
360*6dbdd20aSAndroid Build Coastguard Worker      ui_links.append(
361*6dbdd20aSAndroid Build Coastguard Worker          'https://storage.googleapis.com/%s/%s/ui-test-artifacts/index.html' %
362*6dbdd20aSAndroid Build Coastguard Worker          (GCS_ARTIFACTS, job_id))
363*6dbdd20aSAndroid Build Coastguard Worker    if job_obj['status'] == 'COMPLETED':
364*6dbdd20aSAndroid Build Coastguard Worker      passed_jobs.append(job_id)
365*6dbdd20aSAndroid Build Coastguard Worker    elif not job_config.get('SKIP_VOTING', False):
366*6dbdd20aSAndroid Build Coastguard Worker      cl_vote = -1
367*6dbdd20aSAndroid Build Coastguard Worker      failed_jobs[job_id] = job_obj['status']
368*6dbdd20aSAndroid Build Coastguard Worker
369*6dbdd20aSAndroid Build Coastguard Worker  msg = ''
370*6dbdd20aSAndroid Build Coastguard Worker  if cancelled:
371*6dbdd20aSAndroid Build Coastguard Worker    msg += 'Some jobs in this CI run were cancelled. This likely happened '
372*6dbdd20aSAndroid Build Coastguard Worker    msg += 'because a new patchset has been uploaded. Skipping vote.\n'
373*6dbdd20aSAndroid Build Coastguard Worker  log_url = CI_SITE + '/#!/logs'
374*6dbdd20aSAndroid Build Coastguard Worker  if failed_jobs:
375*6dbdd20aSAndroid Build Coastguard Worker    msg += 'FAIL:\n'
376*6dbdd20aSAndroid Build Coastguard Worker    msg += ''.join([
377*6dbdd20aSAndroid Build Coastguard Worker        '- %s/%s (%s)\n' % (log_url, job_id, status)
378*6dbdd20aSAndroid Build Coastguard Worker        for (job_id, status) in failed_jobs.items()
379*6dbdd20aSAndroid Build Coastguard Worker    ])
380*6dbdd20aSAndroid Build Coastguard Worker  if passed_jobs:
381*6dbdd20aSAndroid Build Coastguard Worker    msg += '#\nPASS:\n'
382*6dbdd20aSAndroid Build Coastguard Worker    msg += ''.join(['- %s/%s\n' % (log_url, job_id) for job_id in passed_jobs])
383*6dbdd20aSAndroid Build Coastguard Worker  if ui_links:
384*6dbdd20aSAndroid Build Coastguard Worker    msg += '\nArtifacts:\n' + ''.join('- %s\n' % link for link in ui_links)
385*6dbdd20aSAndroid Build Coastguard Worker  msg += 'CI page for this CL:\n'
386*6dbdd20aSAndroid Build Coastguard Worker  msg += '- https://ci.perfetto.dev/#!/cls/%s\n' % cl_and_ps.split('-')[0]
387*6dbdd20aSAndroid Build Coastguard Worker  body = {'labels': {}, 'message': msg}
388*6dbdd20aSAndroid Build Coastguard Worker  if not cancelled:
389*6dbdd20aSAndroid Build Coastguard Worker    body['labels']['Code-Review'] = cl_vote
390*6dbdd20aSAndroid Build Coastguard Worker  logging.info('Posting results for CL %s', cl_and_ps)
391*6dbdd20aSAndroid Build Coastguard Worker  url = 'https://%s/a/changes/%s/revisions/%s/review' % (
392*6dbdd20aSAndroid Build Coastguard Worker      GERRIT_HOST, cl_obj['change_id'], cl_obj['revision_id'])
393*6dbdd20aSAndroid Build Coastguard Worker  await req_async('POST', url, body=body, gerrit=True)
394*6dbdd20aSAndroid Build Coastguard Worker  await req_async('PUT', '%s/cls/%s/voted.json' % (DB, cl_and_ps), body=True)
395*6dbdd20aSAndroid Build Coastguard Worker
396*6dbdd20aSAndroid Build Coastguard Worker
397*6dbdd20aSAndroid Build Coastguard Workerasync def queue_postsubmit_jobs(branch: str, revision: str = None):
398*6dbdd20aSAndroid Build Coastguard Worker  '''Creates the jobs entries in the DB for the given branch or revision
399*6dbdd20aSAndroid Build Coastguard Worker
400*6dbdd20aSAndroid Build Coastguard Worker  Can be called in two modes:
401*6dbdd20aSAndroid Build Coastguard Worker    1. ?branch=main: Will retrieve the SHA1 of main and call the one below.
402*6dbdd20aSAndroid Build Coastguard Worker    2. ?branch=main&rev=deadbeef1234: queues jobs for the given revision.
403*6dbdd20aSAndroid Build Coastguard Worker  '''
404*6dbdd20aSAndroid Build Coastguard Worker  prj = urllib.parse.quote(GERRIT_PROJECT, '')
405*6dbdd20aSAndroid Build Coastguard Worker  assert (branch)
406*6dbdd20aSAndroid Build Coastguard Worker
407*6dbdd20aSAndroid Build Coastguard Worker  if not revision:
408*6dbdd20aSAndroid Build Coastguard Worker    # Get the commit SHA1 of the head of the branch.
409*6dbdd20aSAndroid Build Coastguard Worker    url = 'https://%s/a/projects/%s/branches/%s' % (GERRIT_HOST, prj, branch)
410*6dbdd20aSAndroid Build Coastguard Worker    revision = (await req_async('GET', url, gerrit=True))['revision']
411*6dbdd20aSAndroid Build Coastguard Worker    assert (revision)
412*6dbdd20aSAndroid Build Coastguard Worker    # If the latest entry matches the revision, quit without queueing another
413*6dbdd20aSAndroid Build Coastguard Worker    # set of jobs for the same CL. This is an optimization to avoid wasting
414*6dbdd20aSAndroid Build Coastguard Worker    # compute over the weekend to rebuild the same revision every hour.
415*6dbdd20aSAndroid Build Coastguard Worker    filt = 'orderBy="$key"&limitToLast=1'
416*6dbdd20aSAndroid Build Coastguard Worker    cl_objs = await req_async('GET', '%s/branches.json?%s' % (DB, filt)) or {}
417*6dbdd20aSAndroid Build Coastguard Worker    if cl_objs and next(iter(cl_objs.values())).get('rev') == revision:
418*6dbdd20aSAndroid Build Coastguard Worker      logging.debug('Skipping postsubmits for %s: already run', revision)
419*6dbdd20aSAndroid Build Coastguard Worker      return
420*6dbdd20aSAndroid Build Coastguard Worker    await queue_postsubmit_jobs(branch=branch, revision=revision)
421*6dbdd20aSAndroid Build Coastguard Worker    return
422*6dbdd20aSAndroid Build Coastguard Worker
423*6dbdd20aSAndroid Build Coastguard Worker  # Get the committer datetime for the given revision.
424*6dbdd20aSAndroid Build Coastguard Worker  url = 'https://%s/a/projects/%s/commits/%s' % (GERRIT_HOST, prj, revision)
425*6dbdd20aSAndroid Build Coastguard Worker  commit_info = await req_async('GET', url, gerrit=True)
426*6dbdd20aSAndroid Build Coastguard Worker  time_committed = commit_info['committer']['date'].split('.')[0]
427*6dbdd20aSAndroid Build Coastguard Worker  time_committed = datetime.strptime(time_committed, '%Y-%m-%d %H:%M:%S')
428*6dbdd20aSAndroid Build Coastguard Worker
429*6dbdd20aSAndroid Build Coastguard Worker  # Enqueue jobs.
430*6dbdd20aSAndroid Build Coastguard Worker  src = 'branches/%s-%s' % (branch, time_committed.strftime('%Y%m%d%H%M%S'))
431*6dbdd20aSAndroid Build Coastguard Worker  now = datetime.utcnow()
432*6dbdd20aSAndroid Build Coastguard Worker  patch_obj = {
433*6dbdd20aSAndroid Build Coastguard Worker      src: {
434*6dbdd20aSAndroid Build Coastguard Worker          'rev': revision,
435*6dbdd20aSAndroid Build Coastguard Worker          'subject': commit_info['subject'][:100],
436*6dbdd20aSAndroid Build Coastguard Worker          'author': commit_info['author'].get('email', 'N/A'),
437*6dbdd20aSAndroid Build Coastguard Worker          'time_committed': utc_now_iso(time_committed),
438*6dbdd20aSAndroid Build Coastguard Worker          'time_queued': utc_now_iso(),
439*6dbdd20aSAndroid Build Coastguard Worker          'jobs': {},
440*6dbdd20aSAndroid Build Coastguard Worker      }
441*6dbdd20aSAndroid Build Coastguard Worker  }
442*6dbdd20aSAndroid Build Coastguard Worker  ref = 'refs/heads/' + branch
443*6dbdd20aSAndroid Build Coastguard Worker  append_jobs(patch_obj, src, ref, now)
444*6dbdd20aSAndroid Build Coastguard Worker  await req_async('PATCH', DB + '.json', body=patch_obj)
445*6dbdd20aSAndroid Build Coastguard Worker
446*6dbdd20aSAndroid Build Coastguard Worker
447*6dbdd20aSAndroid Build Coastguard Workerasync def delete_expired_logs(ttl_days=LOGS_TTL_DAYS):
448*6dbdd20aSAndroid Build Coastguard Worker  url = '%s/logs.json?limitToFirst=1000&shallow=true' % (DB)
449*6dbdd20aSAndroid Build Coastguard Worker  logs = await req_async('GET', url) or {}
450*6dbdd20aSAndroid Build Coastguard Worker  tasks = []
451*6dbdd20aSAndroid Build Coastguard Worker  logging.debug('delete_expired_logs: got %d keys', len(logs.keys()))
452*6dbdd20aSAndroid Build Coastguard Worker  for job_id in logs.keys():
453*6dbdd20aSAndroid Build Coastguard Worker    age_days = (datetime.now() - datetime.strptime(job_id[:8], '%Y%m%d')).days
454*6dbdd20aSAndroid Build Coastguard Worker    if age_days > ttl_days:
455*6dbdd20aSAndroid Build Coastguard Worker      logging.debug('Delete log %s', job_id)
456*6dbdd20aSAndroid Build Coastguard Worker      tasks.append(defer(delete_job_logs(job_id=job_id)))
457*6dbdd20aSAndroid Build Coastguard Worker  await asyncio.gather(*tasks)
458*6dbdd20aSAndroid Build Coastguard Worker
459*6dbdd20aSAndroid Build Coastguard Worker
460*6dbdd20aSAndroid Build Coastguard Workerasync def delete_stale_jobs():
461*6dbdd20aSAndroid Build Coastguard Worker  '''Deletes jobs that are left in the running queue for too long
462*6dbdd20aSAndroid Build Coastguard Worker
463*6dbdd20aSAndroid Build Coastguard Worker  This is usually due to a crash in the VM that handles them.
464*6dbdd20aSAndroid Build Coastguard Worker  '''
465*6dbdd20aSAndroid Build Coastguard Worker  running_jobs = await req_async('GET', '%s/jobs_running.json?shallow=true' %
466*6dbdd20aSAndroid Build Coastguard Worker                                 (DB)) or {}
467*6dbdd20aSAndroid Build Coastguard Worker  tasks = []
468*6dbdd20aSAndroid Build Coastguard Worker  for job_id in running_jobs.keys():
469*6dbdd20aSAndroid Build Coastguard Worker    job = await req_async('GET', '%s/jobs/%s.json' % (DB, job_id))
470*6dbdd20aSAndroid Build Coastguard Worker    time_started = parse_iso_time(job.get('time_started', utc_now_iso()))
471*6dbdd20aSAndroid Build Coastguard Worker    age = (datetime.now() - time_started).total_seconds()
472*6dbdd20aSAndroid Build Coastguard Worker    if age > JOB_TIMEOUT_SEC * 2:
473*6dbdd20aSAndroid Build Coastguard Worker      tasks.append(defer(cancel_job(job_id=job_id)))
474*6dbdd20aSAndroid Build Coastguard Worker  await asyncio.gather(*tasks)
475*6dbdd20aSAndroid Build Coastguard Worker
476*6dbdd20aSAndroid Build Coastguard Worker
477*6dbdd20aSAndroid Build Coastguard Workerasync def delete_stale_workers():
478*6dbdd20aSAndroid Build Coastguard Worker  '''Deletes workers that have been inactive for too long
479*6dbdd20aSAndroid Build Coastguard Worker
480*6dbdd20aSAndroid Build Coastguard Worker  This is usually due to a crash in the VM that handles them.
481*6dbdd20aSAndroid Build Coastguard Worker  '''
482*6dbdd20aSAndroid Build Coastguard Worker  workers = await req_async('GET', '%s/workers.json' % (DB)) or {}
483*6dbdd20aSAndroid Build Coastguard Worker  patch_obj = {}
484*6dbdd20aSAndroid Build Coastguard Worker  for worker_id, worker in workers.items():
485*6dbdd20aSAndroid Build Coastguard Worker    last_update = parse_iso_time(worker.get('last_update', utc_now_iso()))
486*6dbdd20aSAndroid Build Coastguard Worker    age = (datetime.now() - last_update).total_seconds()
487*6dbdd20aSAndroid Build Coastguard Worker    if age > 60 * 60 * 12:
488*6dbdd20aSAndroid Build Coastguard Worker      patch_obj['workers/' + worker_id] = {}  # DELETE
489*6dbdd20aSAndroid Build Coastguard Worker  if len(patch_obj) == 0:
490*6dbdd20aSAndroid Build Coastguard Worker    return
491*6dbdd20aSAndroid Build Coastguard Worker  logging.info('Purging %d inactive workers', len(patch_obj))
492*6dbdd20aSAndroid Build Coastguard Worker  await req_async('PATCH', DB + '.json', body=patch_obj)
493*6dbdd20aSAndroid Build Coastguard Worker
494*6dbdd20aSAndroid Build Coastguard Worker
495*6dbdd20aSAndroid Build Coastguard Workerasync def cancel_job(job_id: str):
496*6dbdd20aSAndroid Build Coastguard Worker  '''Cancels a job if not completed or failed.
497*6dbdd20aSAndroid Build Coastguard Worker
498*6dbdd20aSAndroid Build Coastguard Worker  This function is racy: workers can complete the queued jobs while we mark them
499*6dbdd20aSAndroid Build Coastguard Worker  as cancelled. The result of such race is still acceptable.'''
500*6dbdd20aSAndroid Build Coastguard Worker  status = await req_async('GET', '%s/jobs/%s/status.json' % (DB, job_id))
501*6dbdd20aSAndroid Build Coastguard Worker  patch_obj = {
502*6dbdd20aSAndroid Build Coastguard Worker      'jobs_running/%s' % job_id: {},  # = DELETE,
503*6dbdd20aSAndroid Build Coastguard Worker      'jobs_queued/%s' % job_id: {},  # = DELETE,
504*6dbdd20aSAndroid Build Coastguard Worker  }
505*6dbdd20aSAndroid Build Coastguard Worker  if status in ('QUEUED', 'STARTED'):
506*6dbdd20aSAndroid Build Coastguard Worker    patch_obj['jobs/%s/status' % job_id] = 'CANCELLED'
507*6dbdd20aSAndroid Build Coastguard Worker    patch_obj['jobs/%s/time_ended' % job_id] = utc_now_iso()
508*6dbdd20aSAndroid Build Coastguard Worker  await req_async('PATCH', DB + '.json', body=patch_obj)
509*6dbdd20aSAndroid Build Coastguard Worker
510*6dbdd20aSAndroid Build Coastguard Worker
511*6dbdd20aSAndroid Build Coastguard Workerasync def requeue_job(job_id: str):
512*6dbdd20aSAndroid Build Coastguard Worker  '''Re-queues a job that was previously interrupted due to a VM shutdown.'''
513*6dbdd20aSAndroid Build Coastguard Worker  logging.info('Requeuing interrupted job %s', job_id)
514*6dbdd20aSAndroid Build Coastguard Worker  patch_obj = {
515*6dbdd20aSAndroid Build Coastguard Worker      'jobs_running/%s' % job_id: {},  # = DELETE,
516*6dbdd20aSAndroid Build Coastguard Worker      'jobs_queued/%s' % job_id: 0,
517*6dbdd20aSAndroid Build Coastguard Worker      'jobs/%s/status' % job_id: 'QUEUED',
518*6dbdd20aSAndroid Build Coastguard Worker      'jobs/%s/time_queued' % job_id: utc_now_iso(),
519*6dbdd20aSAndroid Build Coastguard Worker      'jobs/%s/time_started' % job_id: {},  # = DELETE
520*6dbdd20aSAndroid Build Coastguard Worker      'jobs/%s/time_ended' % job_id: {},  # = DELETE
521*6dbdd20aSAndroid Build Coastguard Worker      'jobs/%s/worker' % job_id: {},  # = DELETE
522*6dbdd20aSAndroid Build Coastguard Worker  }
523*6dbdd20aSAndroid Build Coastguard Worker  await req_async('PATCH', DB + '.json', body=patch_obj)
524*6dbdd20aSAndroid Build Coastguard Worker
525*6dbdd20aSAndroid Build Coastguard Worker
526*6dbdd20aSAndroid Build Coastguard Workerasync def delete_job_logs(job_id: str):
527*6dbdd20aSAndroid Build Coastguard Worker  await req_async('DELETE',
528*6dbdd20aSAndroid Build Coastguard Worker                  '%s/logs/%s.json?writeSizeLimit=unlimited' % (DB, job_id))
529*6dbdd20aSAndroid Build Coastguard Worker
530*6dbdd20aSAndroid Build Coastguard Worker
531*6dbdd20aSAndroid Build Coastguard Workerasync def update_cl_metrics(src: str):
532*6dbdd20aSAndroid Build Coastguard Worker  cl_obj = await req_async('GET', '%s/%s.json' % (DB, src))
533*6dbdd20aSAndroid Build Coastguard Worker  t_queued = parse_iso_time(cl_obj['time_queued'])
534*6dbdd20aSAndroid Build Coastguard Worker  t_ended = parse_iso_time(cl_obj['time_ended'])
535*6dbdd20aSAndroid Build Coastguard Worker  await write_metrics({
536*6dbdd20aSAndroid Build Coastguard Worker      'ci_cl_completion_time': {
537*6dbdd20aSAndroid Build Coastguard Worker          'l': {},
538*6dbdd20aSAndroid Build Coastguard Worker          'v': int((t_ended - t_queued).total_seconds())
539*6dbdd20aSAndroid Build Coastguard Worker      }
540*6dbdd20aSAndroid Build Coastguard Worker  })
541*6dbdd20aSAndroid Build Coastguard Worker
542*6dbdd20aSAndroid Build Coastguard Worker
543*6dbdd20aSAndroid Build Coastguard Workerasync def update_job_metrics(job_id: str):
544*6dbdd20aSAndroid Build Coastguard Worker  job = await req_async('GET', '%s/jobs/%s.json' % (DB, job_id))
545*6dbdd20aSAndroid Build Coastguard Worker  metrics = {}
546*6dbdd20aSAndroid Build Coastguard Worker  if 'time_queued' in job and 'time_started' in job:
547*6dbdd20aSAndroid Build Coastguard Worker    t_queued = parse_iso_time(job['time_queued'])
548*6dbdd20aSAndroid Build Coastguard Worker    t_started = parse_iso_time(job['time_started'])
549*6dbdd20aSAndroid Build Coastguard Worker    metrics['ci_job_queue_time'] = {
550*6dbdd20aSAndroid Build Coastguard Worker        'l': {
551*6dbdd20aSAndroid Build Coastguard Worker            'job_type': job['type']
552*6dbdd20aSAndroid Build Coastguard Worker        },
553*6dbdd20aSAndroid Build Coastguard Worker        'v': int((t_started - t_queued).total_seconds())
554*6dbdd20aSAndroid Build Coastguard Worker    }
555*6dbdd20aSAndroid Build Coastguard Worker  if 'time_ended' in job and 'time_started' in job:
556*6dbdd20aSAndroid Build Coastguard Worker    t_started = parse_iso_time(job['time_started'])
557*6dbdd20aSAndroid Build Coastguard Worker    t_ended = parse_iso_time(job['time_ended'])
558*6dbdd20aSAndroid Build Coastguard Worker    metrics['ci_job_run_time'] = {
559*6dbdd20aSAndroid Build Coastguard Worker        'l': {
560*6dbdd20aSAndroid Build Coastguard Worker            'job_type': job['type']
561*6dbdd20aSAndroid Build Coastguard Worker        },
562*6dbdd20aSAndroid Build Coastguard Worker        'v': int((t_ended - t_started).total_seconds())
563*6dbdd20aSAndroid Build Coastguard Worker    }
564*6dbdd20aSAndroid Build Coastguard Worker  if metrics:
565*6dbdd20aSAndroid Build Coastguard Worker    await write_metrics(metrics)
566*6dbdd20aSAndroid Build Coastguard Worker
567*6dbdd20aSAndroid Build Coastguard Worker
568*6dbdd20aSAndroid Build Coastguard Workerasync def update_queue_metrics():
569*6dbdd20aSAndroid Build Coastguard Worker  # Update the stackdriver metric that will drive the autoscaler.
570*6dbdd20aSAndroid Build Coastguard Worker  queued = await req_async('GET', DB + '/jobs_queued.json?shallow=true') or {}
571*6dbdd20aSAndroid Build Coastguard Worker  running = await req_async('GET', DB + '/jobs_running.json?shallow=true') or {}
572*6dbdd20aSAndroid Build Coastguard Worker  logging.debug('ci_job_queue_len: %d + %d', len(queued), len(running))
573*6dbdd20aSAndroid Build Coastguard Worker  await write_metrics({'ci_job_queue_len': {'v': len(queued) + len(running)}})
574*6dbdd20aSAndroid Build Coastguard Worker
575*6dbdd20aSAndroid Build Coastguard Worker
576*6dbdd20aSAndroid Build Coastguard Workerasync def create_stackdriver_metric_definitions():
577*6dbdd20aSAndroid Build Coastguard Worker  logging.info('Creating Stackdriver metric definitions')
578*6dbdd20aSAndroid Build Coastguard Worker  for name, metric in STACKDRIVER_METRICS.items():
579*6dbdd20aSAndroid Build Coastguard Worker    logging.info('Creating metric %s', name)
580*6dbdd20aSAndroid Build Coastguard Worker    await req_async('POST', STACKDRIVER_API + '/metricDescriptors', body=metric)
581*6dbdd20aSAndroid Build Coastguard Worker
582*6dbdd20aSAndroid Build Coastguard Worker
583*6dbdd20aSAndroid Build Coastguard Workerasync def write_metrics(metric_dict):
584*6dbdd20aSAndroid Build Coastguard Worker  now = utc_now_iso()
585*6dbdd20aSAndroid Build Coastguard Worker  desc = {'timeSeries': []}
586*6dbdd20aSAndroid Build Coastguard Worker  for key, spec in metric_dict.items():
587*6dbdd20aSAndroid Build Coastguard Worker    desc['timeSeries'] += [{
588*6dbdd20aSAndroid Build Coastguard Worker        'metric': {
589*6dbdd20aSAndroid Build Coastguard Worker            'type': STACKDRIVER_METRICS[key]['type'],
590*6dbdd20aSAndroid Build Coastguard Worker            'labels': spec.get('l', {})
591*6dbdd20aSAndroid Build Coastguard Worker        },
592*6dbdd20aSAndroid Build Coastguard Worker        'resource': {
593*6dbdd20aSAndroid Build Coastguard Worker            'type': 'global'
594*6dbdd20aSAndroid Build Coastguard Worker        },
595*6dbdd20aSAndroid Build Coastguard Worker        'points': [{
596*6dbdd20aSAndroid Build Coastguard Worker            'interval': {
597*6dbdd20aSAndroid Build Coastguard Worker                'endTime': now
598*6dbdd20aSAndroid Build Coastguard Worker            },
599*6dbdd20aSAndroid Build Coastguard Worker            'value': {
600*6dbdd20aSAndroid Build Coastguard Worker                'int64Value': str(spec['v'])
601*6dbdd20aSAndroid Build Coastguard Worker            }
602*6dbdd20aSAndroid Build Coastguard Worker        }]
603*6dbdd20aSAndroid Build Coastguard Worker    }]
604*6dbdd20aSAndroid Build Coastguard Worker  try:
605*6dbdd20aSAndroid Build Coastguard Worker    await req_async('POST', STACKDRIVER_API + '/timeSeries', body=desc)
606*6dbdd20aSAndroid Build Coastguard Worker  except Exception as e:
607*6dbdd20aSAndroid Build Coastguard Worker    # Metric updates can easily fail due to Stackdriver API limitations.
608*6dbdd20aSAndroid Build Coastguard Worker    msg = str(e)
609*6dbdd20aSAndroid Build Coastguard Worker    if 'written more frequently than the maximum sampling' not in msg:
610*6dbdd20aSAndroid Build Coastguard Worker      logging.error('Metrics update failed: %s', msg)
611