1*9c5db199SXin Li"""A simple script to backfill tko_task_references table with throttling.""" 2*9c5db199SXin Li 3*9c5db199SXin Lifrom __future__ import absolute_import 4*9c5db199SXin Lifrom __future__ import division 5*9c5db199SXin Lifrom __future__ import print_function 6*9c5db199SXin Li 7*9c5db199SXin Liimport argparse 8*9c5db199SXin Liimport collections 9*9c5db199SXin Liimport contextlib 10*9c5db199SXin Liimport logging 11*9c5db199SXin Liimport time 12*9c5db199SXin Li 13*9c5db199SXin Liimport MySQLdb 14*9c5db199SXin Li 15*9c5db199SXin Li 16*9c5db199SXin Liclass BackfillException(Exception): 17*9c5db199SXin Li pass 18*9c5db199SXin Li 19*9c5db199SXin Li 20*9c5db199SXin Lidef _parse_args(): 21*9c5db199SXin Li parser = argparse.ArgumentParser( 22*9c5db199SXin Li description=__doc__) 23*9c5db199SXin Li parser.add_argument('--host', required=True, help='mysql server host') 24*9c5db199SXin Li parser.add_argument('--user', required=True, help='mysql server user') 25*9c5db199SXin Li parser.add_argument('--password', required=True, help='mysql server password') 26*9c5db199SXin Li parser.add_argument('--dryrun', action='store_true', default=False) 27*9c5db199SXin Li parser.add_argument( 28*9c5db199SXin Li '--num-iterations', 29*9c5db199SXin Li default=None, 30*9c5db199SXin Li type=int, 31*9c5db199SXin Li help='If set, total number of iterations. Default is no limit.', 32*9c5db199SXin Li ) 33*9c5db199SXin Li parser.add_argument( 34*9c5db199SXin Li '--batch-size', 35*9c5db199SXin Li default=1000, 36*9c5db199SXin Li help='Number of tko_jobs rows to read in one iteration', 37*9c5db199SXin Li ) 38*9c5db199SXin Li parser.add_argument( 39*9c5db199SXin Li '--sleep-seconds', 40*9c5db199SXin Li type=int, 41*9c5db199SXin Li default=1, 42*9c5db199SXin Li help='Time to sleep between iterations', 43*9c5db199SXin Li ) 44*9c5db199SXin Li 45*9c5db199SXin Li args = parser.parse_args() 46*9c5db199SXin Li if args.dryrun: 47*9c5db199SXin Li if not args.num_iterations: 48*9c5db199SXin Li logging.info('DRYRUN: Limiting to 5 iterations in dryrun mode.') 49*9c5db199SXin Li args.num_iterations = 5 50*9c5db199SXin Li return args 51*9c5db199SXin Li 52*9c5db199SXin Li 53*9c5db199SXin Li 54*9c5db199SXin Li@contextlib.contextmanager 55*9c5db199SXin Lidef _mysql_connection(args): 56*9c5db199SXin Li conn = MySQLdb.connect(user=args.user, host=args.host, passwd=args.password) 57*9c5db199SXin Li with _mysql_cursor(conn) as c: 58*9c5db199SXin Li c.execute('USE chromeos_autotest_db;') 59*9c5db199SXin Li try: 60*9c5db199SXin Li yield conn 61*9c5db199SXin Li finally: 62*9c5db199SXin Li conn.close() 63*9c5db199SXin Li 64*9c5db199SXin Li 65*9c5db199SXin Li@contextlib.contextmanager 66*9c5db199SXin Lidef _autocommit(conn): 67*9c5db199SXin Li try: 68*9c5db199SXin Li yield conn 69*9c5db199SXin Li except: 70*9c5db199SXin Li conn.rollback() 71*9c5db199SXin Li else: 72*9c5db199SXin Li conn.commit() 73*9c5db199SXin Li 74*9c5db199SXin Li 75*9c5db199SXin Li@contextlib.contextmanager 76*9c5db199SXin Lidef _mysql_cursor(conn): 77*9c5db199SXin Li c = conn.cursor() 78*9c5db199SXin Li try: 79*9c5db199SXin Li yield c 80*9c5db199SXin Li finally: 81*9c5db199SXin Li c.close() 82*9c5db199SXin Li 83*9c5db199SXin Li 84*9c5db199SXin Lidef _latest_unfilled_job_idx(conn): 85*9c5db199SXin Li with _mysql_cursor(conn) as c: 86*9c5db199SXin Li c.execute(""" 87*9c5db199SXin LiSELECT tko_job_idx 88*9c5db199SXin LiFROM tko_task_references 89*9c5db199SXin LiORDER BY tko_job_idx 90*9c5db199SXin LiLIMIT 1 91*9c5db199SXin Li;""") 92*9c5db199SXin Li r = c.fetchall() 93*9c5db199SXin Li if r: 94*9c5db199SXin Li return str(long(r[0][0]) - 1) 95*9c5db199SXin Li logging.debug('tko_task_references is empty.' 96*9c5db199SXin Li ' Grabbing the latest tko_job_idx to fill.') 97*9c5db199SXin Li with _mysql_cursor(conn) as c: 98*9c5db199SXin Li c.execute(""" 99*9c5db199SXin LiSELECT job_idx 100*9c5db199SXin LiFROM tko_jobs 101*9c5db199SXin LiORDER BY job_idx DESC 102*9c5db199SXin LiLIMIT 1 103*9c5db199SXin Li;""") 104*9c5db199SXin Li r = c.fetchall() 105*9c5db199SXin Li if r: 106*9c5db199SXin Li return r[0][0] 107*9c5db199SXin Li return None 108*9c5db199SXin Li 109*9c5db199SXin Li 110*9c5db199SXin Li_TKOTaskReference = collections.namedtuple( 111*9c5db199SXin Li '_TKOTaskReference', 112*9c5db199SXin Li ['tko_job_idx', 'task_reference', 'parent_task_reference'], 113*9c5db199SXin Li) 114*9c5db199SXin Li 115*9c5db199SXin Li_SQL_SELECT_TASK_REFERENCES = """ 116*9c5db199SXin LiSELECT job_idx, afe_job_id, afe_parent_job_id 117*9c5db199SXin LiFROM tko_jobs 118*9c5db199SXin LiWHERE job_idx <= %(latest_job_idx)s 119*9c5db199SXin LiORDER BY job_idx DESC 120*9c5db199SXin LiLIMIT %(batch_size)s 121*9c5db199SXin Li;""" 122*9c5db199SXin Li_SQL_INSERT_TASK_REFERENCES = """ 123*9c5db199SXin LiINSERT INTO tko_task_references(reference_type, tko_job_idx, task_id, parent_task_id) 124*9c5db199SXin LiVALUES %(values)s 125*9c5db199SXin Li;""" 126*9c5db199SXin Li_SQL_SELECT_TASK_REFERENCE = """ 127*9c5db199SXin LiSELECT tko_job_idx FROM tko_task_references WHERE tko_job_idx = %(tko_job_idx)s 128*9c5db199SXin Li;""" 129*9c5db199SXin Li 130*9c5db199SXin Li 131*9c5db199SXin Lidef _compute_task_references(conn, latest_job_idx, batch_size): 132*9c5db199SXin Li with _mysql_cursor(conn) as c: 133*9c5db199SXin Li sql = _SQL_SELECT_TASK_REFERENCES % { 134*9c5db199SXin Li 'latest_job_idx': latest_job_idx, 135*9c5db199SXin Li 'batch_size': batch_size, 136*9c5db199SXin Li } 137*9c5db199SXin Li c.execute(sql) 138*9c5db199SXin Li rs = c.fetchall() 139*9c5db199SXin Li if rs is None: 140*9c5db199SXin Li return [] 141*9c5db199SXin Li 142*9c5db199SXin Li return [_TKOTaskReference(r[0], r[1], r[2]) for r in rs] 143*9c5db199SXin Li 144*9c5db199SXin Li 145*9c5db199SXin Lidef _insert_task_references(conn, task_references, dryrun): 146*9c5db199SXin Li values = ', '.join([ 147*9c5db199SXin Li '("afe", %s, "%s", "%s")' % 148*9c5db199SXin Li (tr.tko_job_idx, tr.task_reference, tr.parent_task_reference) 149*9c5db199SXin Li for tr in task_references 150*9c5db199SXin Li ]) 151*9c5db199SXin Li sql = _SQL_INSERT_TASK_REFERENCES % {'values': values} 152*9c5db199SXin Li if dryrun: 153*9c5db199SXin Li if len(sql) < 200: 154*9c5db199SXin Li sql_log = sql 155*9c5db199SXin Li else: 156*9c5db199SXin Li sql_log = '%s... [SNIP] ...%s' % (sql[:150], sql[-49:]) 157*9c5db199SXin Li logging.debug('Would have run: %s', sql_log) 158*9c5db199SXin Li with _autocommit(conn) as conn: 159*9c5db199SXin Li with _mysql_cursor(conn) as c: 160*9c5db199SXin Li c.execute(sql) 161*9c5db199SXin Li 162*9c5db199SXin Li 163*9c5db199SXin Lidef _verify_task_references(conn, task_references): 164*9c5db199SXin Li # Just verify that the last one was inserted. 165*9c5db199SXin Li if not task_references: 166*9c5db199SXin Li return 167*9c5db199SXin Li tko_job_idx = task_references[-1].tko_job_idx 168*9c5db199SXin Li sql = _SQL_SELECT_TASK_REFERENCE % {'tko_job_idx': tko_job_idx} 169*9c5db199SXin Li with _mysql_cursor(conn) as c: 170*9c5db199SXin Li c.execute(sql) 171*9c5db199SXin Li r = c.fetchall() 172*9c5db199SXin Li if not r or r[0][0] != tko_job_idx: 173*9c5db199SXin Li raise BackfillException( 174*9c5db199SXin Li 'Failed to insert task reference for tko_job_id %s' % tko_job_idx) 175*9c5db199SXin Li 176*9c5db199SXin Li 177*9c5db199SXin Lidef _next_job_idx(task_references): 178*9c5db199SXin Li return str(long(task_references[-1].tko_job_idx) - 1) 179*9c5db199SXin Li 180*9c5db199SXin Lidef main(): 181*9c5db199SXin Li logging.basicConfig(level=logging.DEBUG) 182*9c5db199SXin Li args = _parse_args() 183*9c5db199SXin Li with _mysql_connection(args) as conn: 184*9c5db199SXin Li tko_job_idx = _latest_unfilled_job_idx(conn) 185*9c5db199SXin Li if tko_job_idx is None: 186*9c5db199SXin Li raise BackfillException('Failed to get last unfilled tko_job_idx') 187*9c5db199SXin Li logging.info('First tko_job_idx to fill: %s', tko_job_idx) 188*9c5db199SXin Li 189*9c5db199SXin Li while True: 190*9c5db199SXin Li logging.info('####################################') 191*9c5db199SXin Li logging.info('Start backfilling from tko_job_idx: %s', tko_job_idx) 192*9c5db199SXin Li 193*9c5db199SXin Li task_references = () 194*9c5db199SXin Li with _mysql_connection(args) as conn: 195*9c5db199SXin Li task_references = _compute_task_references( 196*9c5db199SXin Li conn, tko_job_idx, args.batch_size) 197*9c5db199SXin Li if not task_references: 198*9c5db199SXin Li logging.info('No more unfilled task references. All done!') 199*9c5db199SXin Li break 200*9c5db199SXin Li 201*9c5db199SXin Li logging.info( 202*9c5db199SXin Li 'Inserting %d task references. tko_job_ids: %d...%d', 203*9c5db199SXin Li len(task_references), 204*9c5db199SXin Li task_references[0].tko_job_idx, 205*9c5db199SXin Li task_references[-1].tko_job_idx, 206*9c5db199SXin Li ) 207*9c5db199SXin Li with _mysql_connection(args) as conn: 208*9c5db199SXin Li _insert_task_references(conn, task_references, args.dryrun) 209*9c5db199SXin Li if not args.dryrun: 210*9c5db199SXin Li with _mysql_connection(args) as conn: 211*9c5db199SXin Li _verify_task_references(conn, task_references) 212*9c5db199SXin Li 213*9c5db199SXin Li tko_job_idx = _next_job_idx(task_references) 214*9c5db199SXin Li 215*9c5db199SXin Li if args.num_iterations is not None: 216*9c5db199SXin Li args.num_iterations -= 1 217*9c5db199SXin Li if args.num_iterations <= 0: 218*9c5db199SXin Li break 219*9c5db199SXin Li logging.info('%d more iterations left', args.num_iterations) 220*9c5db199SXin Li logging.info('Iteration done. Sleeping for %d seconds', args.sleep_seconds) 221*9c5db199SXin Li time.sleep(args.sleep_seconds) 222*9c5db199SXin Li 223*9c5db199SXin Li 224*9c5db199SXin Liif __name__ == '__main__': 225*9c5db199SXin Li main() 226