1#!/usr/bin/env python3 2# Copyright (C) 2019 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15''' Worker main loop. Pulls jobs from the DB and runs them in the sandbox 16 17It also handles timeouts and graceful container termination. 18''' 19 20import logging 21import os 22import signal 23import socket 24import subprocess 25import threading 26import time 27import traceback 28 29from config import DB, JOB_TIMEOUT_SEC 30from common_utils import req, utc_now_iso, init_logging 31from common_utils import ConcurrentModificationError, SCOPES 32 33CUR_DIR = os.path.dirname(__file__) 34SCOPES.append('https://www.googleapis.com/auth/firebase.database') 35SCOPES.append('https://www.googleapis.com/auth/userinfo.email') 36WORKER_NAME = '%s-%s' % (os.getenv( 37 'WORKER_HOST', 'local').split('-')[-1], socket.gethostname()) 38sigterm = threading.Event() 39 40 41def try_acquire_job(job_id): 42 ''' Transactionally acquire the given job. 43 44 Returns the job JSON object if it managed to acquire and put it into the 45 STARTED state, None if another worker got there first. 46 ''' 47 logging.debug('Trying to acquire job %s', job_id) 48 49 uri = '%s/jobs/%s.json' % (DB, job_id) 50 job, etag = req('GET', uri, req_etag=True) 51 if job['status'] != 'QUEUED': 52 return None # Somebody else took it or the job is CANCELLED/INTERRUPTED 53 try: 54 job['status'] = 'STARTED' 55 job['time_started'] = utc_now_iso() 56 job['worker'] = WORKER_NAME 57 req('PUT', uri, body=job, etag=etag) 58 return job 59 except ConcurrentModificationError: 60 return None 61 62 63def make_worker_obj(status, job_id=None): 64 return { 65 'job_id': job_id, 66 'status': status, 67 'last_update': utc_now_iso(), 68 'host': os.getenv('WORKER_HOST', '') 69 } 70 71 72def worker_loop(): 73 ''' Pulls a job from the queue and runs it invoking run_job.py ''' 74 uri = '%s/jobs_queued.json?orderBy="$key"&limitToLast=100' % DB 75 jobs = req('GET', uri) 76 if not jobs: 77 return 78 79 # Work out the worker number from the hostname. We try to distribute the load 80 # (via the time.sleep below) so that we fill first all the worker-1 of each 81 # vm, then worker-2 and so on. This is designed so that if there is only one 82 # CL (hence N jobs) in the queue, each VM gets only one job, maximizing the 83 # cpu efficiency of each VM. 84 try: 85 worker_num = int(socket.gethostname().split('-')[-1]) 86 except ValueError: 87 worker_num = 1 88 89 # Transactionally acquire a job. Deal with races (two workers trying to 90 # acquire the same job). 91 job = None 92 job_id = None 93 for job_id in sorted(jobs.keys(), reverse=True): 94 job = try_acquire_job(job_id) 95 if job is not None: 96 break 97 time.sleep(worker_num) 98 if job is None: 99 logging.error('Failed to acquire a job') 100 return 101 102 logging.info('Starting job %s', job_id) 103 104 # Update the db, move the job to the running queue. 105 patch_obj = { 106 'jobs_queued/' + job_id: {}, # = DELETE 107 'jobs_running/' + job_id: { 108 'worker': WORKER_NAME 109 }, 110 'workers/' + WORKER_NAME: make_worker_obj('RUNNING', job_id=job_id) 111 } 112 req('PATCH', '%s.json' % DB, body=patch_obj) 113 114 cmd = [os.path.join(CUR_DIR, 'run_job.py'), job_id] 115 116 # Propagate the worker's PERFETTO_ vars and merge with the job-specific vars. 117 env = dict(os.environ, **{k: str(v) for (k, v) in job['env'].items()}) 118 job_runner = subprocess.Popen(cmd, env=env) 119 120 # Run the job in a python subprocess, to isolate the main loop from logs 121 # uploader failures. 122 res = None 123 cancelled = False 124 timed_out = False 125 time_started = time.time() 126 time_last_db_poll = time_started 127 polled_status = 'STARTED' 128 while res is None: 129 time.sleep(0.25) 130 res = job_runner.poll() 131 now = time.time() 132 if now - time_last_db_poll > 10: # Throttle DB polling. 133 polled_status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id)) 134 time_last_db_poll = now 135 if now - time_started > JOB_TIMEOUT_SEC: 136 logging.info('Job %s timed out, terminating', job_id) 137 timed_out = True 138 job_runner.terminate() 139 if (sigterm.is_set() or polled_status != 'STARTED') and not cancelled: 140 logging.info('Job %s cancelled, terminating', job_id) 141 cancelled = True 142 job_runner.terminate() 143 144 status = ('INTERRUPTED' if sigterm.is_set() else 'CANCELLED' if cancelled else 145 'TIMED_OUT' if timed_out else 'COMPLETED' if res == 0 else 'FAILED') 146 logging.info('Job %s %s with code %s', job_id, status, res) 147 148 # Update the DB, unless the job has been cancelled. The "is not None" 149 # condition deals with a very niche case, that is, avoid creating a partial 150 # job entry after doing a full clear of the DB (which is super rare, happens 151 # only when re-deploying the CI). 152 if polled_status is not None: 153 patch = { 154 'jobs/%s/status' % job_id: status, 155 'jobs/%s/exit_code' % job_id: {} if res is None else res, 156 'jobs/%s/time_ended' % job_id: utc_now_iso(), 157 'jobs_running/%s' % job_id: {}, # = DELETE 158 } 159 req('PATCH', '%s.json' % (DB), body=patch) 160 161 162def sig_handler(_, __): 163 logging.warning('Interrupted by signal, exiting worker') 164 sigterm.set() 165 166 167def main(): 168 init_logging() 169 logging.info('Worker started') 170 signal.signal(signal.SIGTERM, sig_handler) 171 signal.signal(signal.SIGINT, sig_handler) 172 173 while not sigterm.is_set(): 174 logging.debug('Starting poll cycle') 175 try: 176 worker_loop() 177 req('PUT', 178 '%s/workers/%s.json' % (DB, WORKER_NAME), 179 body=make_worker_obj('IDLE')) 180 except: 181 logging.error('Exception in worker loop:\n%s', traceback.format_exc()) 182 if sigterm.is_set(): 183 break 184 185 # Synchronize sleeping with the wall clock. This is so all VMs wake up at 186 # the same time. See comment on distributing load above in this file. 187 poll_time_sec = 5 188 time.sleep(poll_time_sec - (time.time() % poll_time_sec)) 189 190 # The use case here is the VM being terminated by the GCE infrastructure. 191 # We mark the worker as terminated and the job as cancelled so we don't wait 192 # forever for it. 193 logging.warning('Exiting the worker loop, got signal: %s', sigterm.is_set()) 194 req('PUT', 195 '%s/workers/%s.json' % (DB, WORKER_NAME), 196 body=make_worker_obj('TERMINATED')) 197 198 199if __name__ == '__main__': 200 main() 201