xref: /aosp_15_r20/external/perfetto/infra/ci/worker/worker.py (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1#!/usr/bin/env python3
2# Copyright (C) 2019 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15''' Worker main loop. Pulls jobs from the DB and runs them in the sandbox
16
17It also handles timeouts and graceful container termination.
18'''
19
20import logging
21import os
22import signal
23import socket
24import subprocess
25import threading
26import time
27import traceback
28
29from config import DB, JOB_TIMEOUT_SEC
30from common_utils import req, utc_now_iso, init_logging
31from common_utils import ConcurrentModificationError, SCOPES
32
33CUR_DIR = os.path.dirname(__file__)
34SCOPES.append('https://www.googleapis.com/auth/firebase.database')
35SCOPES.append('https://www.googleapis.com/auth/userinfo.email')
36WORKER_NAME = '%s-%s' % (os.getenv(
37    'WORKER_HOST', 'local').split('-')[-1], socket.gethostname())
38sigterm = threading.Event()
39
40
41def try_acquire_job(job_id):
42  ''' Transactionally acquire the given job.
43
44  Returns the job JSON object if it managed to acquire and put it into the
45  STARTED state, None if another worker got there first.
46  '''
47  logging.debug('Trying to acquire job %s', job_id)
48
49  uri = '%s/jobs/%s.json' % (DB, job_id)
50  job, etag = req('GET', uri, req_etag=True)
51  if job['status'] != 'QUEUED':
52    return None  # Somebody else took it or the job is CANCELLED/INTERRUPTED
53  try:
54    job['status'] = 'STARTED'
55    job['time_started'] = utc_now_iso()
56    job['worker'] = WORKER_NAME
57    req('PUT', uri, body=job, etag=etag)
58    return job
59  except ConcurrentModificationError:
60    return None
61
62
63def make_worker_obj(status, job_id=None):
64  return {
65      'job_id': job_id,
66      'status': status,
67      'last_update': utc_now_iso(),
68      'host': os.getenv('WORKER_HOST', '')
69  }
70
71
72def worker_loop():
73  ''' Pulls a job from the queue and runs it invoking run_job.py  '''
74  uri = '%s/jobs_queued.json?orderBy="$key"&limitToLast=100' % DB
75  jobs = req('GET', uri)
76  if not jobs:
77    return
78
79  # Work out the worker number from the hostname. We try to distribute the load
80  # (via the time.sleep below) so that we fill first all the worker-1 of each
81  # vm, then worker-2 and so on. This is designed so that if there is only one
82  # CL (hence N jobs) in the queue, each VM gets only one job, maximizing the
83  # cpu efficiency of each VM.
84  try:
85    worker_num = int(socket.gethostname().split('-')[-1])
86  except ValueError:
87    worker_num = 1
88
89  # Transactionally acquire a job. Deal with races (two workers trying to
90  # acquire the same job).
91  job = None
92  job_id = None
93  for job_id in sorted(jobs.keys(), reverse=True):
94    job = try_acquire_job(job_id)
95    if job is not None:
96      break
97    time.sleep(worker_num)
98  if job is None:
99    logging.error('Failed to acquire a job')
100    return
101
102  logging.info('Starting job %s', job_id)
103
104  # Update the db, move the job to the running queue.
105  patch_obj = {
106      'jobs_queued/' + job_id: {},  # = DELETE
107      'jobs_running/' + job_id: {
108          'worker': WORKER_NAME
109      },
110      'workers/' + WORKER_NAME: make_worker_obj('RUNNING', job_id=job_id)
111  }
112  req('PATCH', '%s.json' % DB, body=patch_obj)
113
114  cmd = [os.path.join(CUR_DIR, 'run_job.py'), job_id]
115
116  # Propagate the worker's PERFETTO_  vars and merge with the job-specific vars.
117  env = dict(os.environ, **{k: str(v) for (k, v) in job['env'].items()})
118  job_runner = subprocess.Popen(cmd, env=env)
119
120  # Run the job in a python subprocess, to isolate the main loop from logs
121  # uploader failures.
122  res = None
123  cancelled = False
124  timed_out = False
125  time_started = time.time()
126  time_last_db_poll = time_started
127  polled_status = 'STARTED'
128  while res is None:
129    time.sleep(0.25)
130    res = job_runner.poll()
131    now = time.time()
132    if now - time_last_db_poll > 10:  # Throttle DB polling.
133      polled_status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id))
134      time_last_db_poll = now
135    if now - time_started > JOB_TIMEOUT_SEC:
136      logging.info('Job %s timed out, terminating', job_id)
137      timed_out = True
138      job_runner.terminate()
139    if (sigterm.is_set() or polled_status != 'STARTED') and not cancelled:
140      logging.info('Job %s cancelled, terminating', job_id)
141      cancelled = True
142      job_runner.terminate()
143
144  status = ('INTERRUPTED' if sigterm.is_set() else 'CANCELLED' if cancelled else
145            'TIMED_OUT' if timed_out else 'COMPLETED' if res == 0 else 'FAILED')
146  logging.info('Job %s %s with code %s', job_id, status, res)
147
148  # Update the DB, unless the job has been cancelled. The "is not None"
149  # condition deals with a very niche case, that is, avoid creating a partial
150  # job entry after doing a full clear of the DB (which is super rare, happens
151  # only when re-deploying the CI).
152  if polled_status is not None:
153    patch = {
154        'jobs/%s/status' % job_id: status,
155        'jobs/%s/exit_code' % job_id: {} if res is None else res,
156        'jobs/%s/time_ended' % job_id: utc_now_iso(),
157        'jobs_running/%s' % job_id: {},  # = DELETE
158    }
159    req('PATCH', '%s.json' % (DB), body=patch)
160
161
162def sig_handler(_, __):
163  logging.warning('Interrupted by signal, exiting worker')
164  sigterm.set()
165
166
167def main():
168  init_logging()
169  logging.info('Worker started')
170  signal.signal(signal.SIGTERM, sig_handler)
171  signal.signal(signal.SIGINT, sig_handler)
172
173  while not sigterm.is_set():
174    logging.debug('Starting poll cycle')
175    try:
176      worker_loop()
177      req('PUT',
178          '%s/workers/%s.json' % (DB, WORKER_NAME),
179          body=make_worker_obj('IDLE'))
180    except:
181      logging.error('Exception in worker loop:\n%s', traceback.format_exc())
182    if sigterm.is_set():
183      break
184
185    # Synchronize sleeping with the wall clock. This is so all VMs wake up at
186    # the same time. See comment on distributing load above in this file.
187    poll_time_sec = 5
188    time.sleep(poll_time_sec - (time.time() % poll_time_sec))
189
190  # The use case here is the VM being terminated by the GCE infrastructure.
191  # We mark the worker as terminated and the job as cancelled so we don't wait
192  # forever for it.
193  logging.warning('Exiting the worker loop, got signal: %s', sigterm.is_set())
194  req('PUT',
195      '%s/workers/%s.json' % (DB, WORKER_NAME),
196      body=make_worker_obj('TERMINATED'))
197
198
199if __name__ == '__main__':
200  main()
201