xref: /aosp_15_r20/external/autotest/contrib/backfill_tko_task_references.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li"""A simple script to backfill tko_task_references table with throttling."""
2*9c5db199SXin Li
3*9c5db199SXin Lifrom __future__ import absolute_import
4*9c5db199SXin Lifrom __future__ import division
5*9c5db199SXin Lifrom __future__ import print_function
6*9c5db199SXin Li
7*9c5db199SXin Liimport argparse
8*9c5db199SXin Liimport collections
9*9c5db199SXin Liimport contextlib
10*9c5db199SXin Liimport logging
11*9c5db199SXin Liimport time
12*9c5db199SXin Li
13*9c5db199SXin Liimport MySQLdb
14*9c5db199SXin Li
15*9c5db199SXin Li
16*9c5db199SXin Liclass BackfillException(Exception):
17*9c5db199SXin Li  pass
18*9c5db199SXin Li
19*9c5db199SXin Li
20*9c5db199SXin Lidef _parse_args():
21*9c5db199SXin Li  parser = argparse.ArgumentParser(
22*9c5db199SXin Li      description=__doc__)
23*9c5db199SXin Li  parser.add_argument('--host', required=True, help='mysql server host')
24*9c5db199SXin Li  parser.add_argument('--user', required=True, help='mysql server user')
25*9c5db199SXin Li  parser.add_argument('--password', required=True, help='mysql server password')
26*9c5db199SXin Li  parser.add_argument('--dryrun', action='store_true', default=False)
27*9c5db199SXin Li  parser.add_argument(
28*9c5db199SXin Li      '--num-iterations',
29*9c5db199SXin Li      default=None,
30*9c5db199SXin Li      type=int,
31*9c5db199SXin Li      help='If set, total number of iterations. Default is no limit.',
32*9c5db199SXin Li  )
33*9c5db199SXin Li  parser.add_argument(
34*9c5db199SXin Li      '--batch-size',
35*9c5db199SXin Li      default=1000,
36*9c5db199SXin Li      help='Number of tko_jobs rows to read in one iteration',
37*9c5db199SXin Li  )
38*9c5db199SXin Li  parser.add_argument(
39*9c5db199SXin Li      '--sleep-seconds',
40*9c5db199SXin Li      type=int,
41*9c5db199SXin Li      default=1,
42*9c5db199SXin Li      help='Time to sleep between iterations',
43*9c5db199SXin Li  )
44*9c5db199SXin Li
45*9c5db199SXin Li  args = parser.parse_args()
46*9c5db199SXin Li  if args.dryrun:
47*9c5db199SXin Li    if not args.num_iterations:
48*9c5db199SXin Li      logging.info('DRYRUN: Limiting to 5 iterations in dryrun mode.')
49*9c5db199SXin Li      args.num_iterations = 5
50*9c5db199SXin Li  return args
51*9c5db199SXin Li
52*9c5db199SXin Li
53*9c5db199SXin Li
54*9c5db199SXin Li@contextlib.contextmanager
55*9c5db199SXin Lidef _mysql_connection(args):
56*9c5db199SXin Li  conn = MySQLdb.connect(user=args.user, host=args.host, passwd=args.password)
57*9c5db199SXin Li  with _mysql_cursor(conn) as c:
58*9c5db199SXin Li    c.execute('USE chromeos_autotest_db;')
59*9c5db199SXin Li  try:
60*9c5db199SXin Li    yield conn
61*9c5db199SXin Li  finally:
62*9c5db199SXin Li    conn.close()
63*9c5db199SXin Li
64*9c5db199SXin Li
65*9c5db199SXin Li@contextlib.contextmanager
66*9c5db199SXin Lidef _autocommit(conn):
67*9c5db199SXin Li  try:
68*9c5db199SXin Li    yield conn
69*9c5db199SXin Li  except:
70*9c5db199SXin Li    conn.rollback()
71*9c5db199SXin Li  else:
72*9c5db199SXin Li    conn.commit()
73*9c5db199SXin Li
74*9c5db199SXin Li
75*9c5db199SXin Li@contextlib.contextmanager
76*9c5db199SXin Lidef _mysql_cursor(conn):
77*9c5db199SXin Li  c = conn.cursor()
78*9c5db199SXin Li  try:
79*9c5db199SXin Li    yield c
80*9c5db199SXin Li  finally:
81*9c5db199SXin Li    c.close()
82*9c5db199SXin Li
83*9c5db199SXin Li
84*9c5db199SXin Lidef _latest_unfilled_job_idx(conn):
85*9c5db199SXin Li  with _mysql_cursor(conn) as c:
86*9c5db199SXin Li    c.execute("""
87*9c5db199SXin LiSELECT tko_job_idx
88*9c5db199SXin LiFROM tko_task_references
89*9c5db199SXin LiORDER BY tko_job_idx
90*9c5db199SXin LiLIMIT 1
91*9c5db199SXin Li;""")
92*9c5db199SXin Li    r = c.fetchall()
93*9c5db199SXin Li    if r:
94*9c5db199SXin Li      return str(long(r[0][0]) - 1)
95*9c5db199SXin Li  logging.debug('tko_task_references is empty.'
96*9c5db199SXin Li               ' Grabbing the latest tko_job_idx to fill.')
97*9c5db199SXin Li  with _mysql_cursor(conn) as c:
98*9c5db199SXin Li    c.execute("""
99*9c5db199SXin LiSELECT job_idx
100*9c5db199SXin LiFROM tko_jobs
101*9c5db199SXin LiORDER BY job_idx DESC
102*9c5db199SXin LiLIMIT 1
103*9c5db199SXin Li;""")
104*9c5db199SXin Li    r = c.fetchall()
105*9c5db199SXin Li    if r:
106*9c5db199SXin Li      return r[0][0]
107*9c5db199SXin Li  return None
108*9c5db199SXin Li
109*9c5db199SXin Li
110*9c5db199SXin Li_TKOTaskReference = collections.namedtuple(
111*9c5db199SXin Li    '_TKOTaskReference',
112*9c5db199SXin Li    ['tko_job_idx', 'task_reference', 'parent_task_reference'],
113*9c5db199SXin Li)
114*9c5db199SXin Li
115*9c5db199SXin Li_SQL_SELECT_TASK_REFERENCES = """
116*9c5db199SXin LiSELECT job_idx, afe_job_id, afe_parent_job_id
117*9c5db199SXin LiFROM tko_jobs
118*9c5db199SXin LiWHERE job_idx <= %(latest_job_idx)s
119*9c5db199SXin LiORDER BY job_idx DESC
120*9c5db199SXin LiLIMIT %(batch_size)s
121*9c5db199SXin Li;"""
122*9c5db199SXin Li_SQL_INSERT_TASK_REFERENCES = """
123*9c5db199SXin LiINSERT INTO tko_task_references(reference_type, tko_job_idx, task_id, parent_task_id)
124*9c5db199SXin LiVALUES %(values)s
125*9c5db199SXin Li;"""
126*9c5db199SXin Li_SQL_SELECT_TASK_REFERENCE = """
127*9c5db199SXin LiSELECT tko_job_idx FROM tko_task_references WHERE tko_job_idx = %(tko_job_idx)s
128*9c5db199SXin Li;"""
129*9c5db199SXin Li
130*9c5db199SXin Li
131*9c5db199SXin Lidef _compute_task_references(conn, latest_job_idx, batch_size):
132*9c5db199SXin Li  with _mysql_cursor(conn) as c:
133*9c5db199SXin Li    sql = _SQL_SELECT_TASK_REFERENCES % {
134*9c5db199SXin Li        'latest_job_idx': latest_job_idx,
135*9c5db199SXin Li        'batch_size': batch_size,
136*9c5db199SXin Li    }
137*9c5db199SXin Li    c.execute(sql)
138*9c5db199SXin Li    rs = c.fetchall()
139*9c5db199SXin Li    if rs is None:
140*9c5db199SXin Li      return []
141*9c5db199SXin Li
142*9c5db199SXin Li    return [_TKOTaskReference(r[0], r[1], r[2]) for r in rs]
143*9c5db199SXin Li
144*9c5db199SXin Li
145*9c5db199SXin Lidef _insert_task_references(conn, task_references, dryrun):
146*9c5db199SXin Li  values = ', '.join([
147*9c5db199SXin Li      '("afe", %s, "%s", "%s")' %
148*9c5db199SXin Li      (tr.tko_job_idx, tr.task_reference, tr.parent_task_reference)
149*9c5db199SXin Li      for tr in task_references
150*9c5db199SXin Li  ])
151*9c5db199SXin Li  sql = _SQL_INSERT_TASK_REFERENCES % {'values': values}
152*9c5db199SXin Li  if dryrun:
153*9c5db199SXin Li    if len(sql) < 200:
154*9c5db199SXin Li      sql_log = sql
155*9c5db199SXin Li    else:
156*9c5db199SXin Li      sql_log = '%s... [SNIP] ...%s' % (sql[:150], sql[-49:])
157*9c5db199SXin Li    logging.debug('Would have run: %s', sql_log)
158*9c5db199SXin Li  with _autocommit(conn) as conn:
159*9c5db199SXin Li    with _mysql_cursor(conn) as c:
160*9c5db199SXin Li      c.execute(sql)
161*9c5db199SXin Li
162*9c5db199SXin Li
163*9c5db199SXin Lidef _verify_task_references(conn, task_references):
164*9c5db199SXin Li  # Just verify that the last one was inserted.
165*9c5db199SXin Li  if not task_references:
166*9c5db199SXin Li    return
167*9c5db199SXin Li  tko_job_idx = task_references[-1].tko_job_idx
168*9c5db199SXin Li  sql = _SQL_SELECT_TASK_REFERENCE % {'tko_job_idx': tko_job_idx}
169*9c5db199SXin Li  with _mysql_cursor(conn) as c:
170*9c5db199SXin Li    c.execute(sql)
171*9c5db199SXin Li    r = c.fetchall()
172*9c5db199SXin Li    if not r or r[0][0] != tko_job_idx:
173*9c5db199SXin Li      raise BackfillException(
174*9c5db199SXin Li          'Failed to insert task reference for tko_job_id %s' % tko_job_idx)
175*9c5db199SXin Li
176*9c5db199SXin Li
177*9c5db199SXin Lidef _next_job_idx(task_references):
178*9c5db199SXin Li  return str(long(task_references[-1].tko_job_idx) - 1)
179*9c5db199SXin Li
180*9c5db199SXin Lidef main():
181*9c5db199SXin Li  logging.basicConfig(level=logging.DEBUG)
182*9c5db199SXin Li  args = _parse_args()
183*9c5db199SXin Li  with _mysql_connection(args) as conn:
184*9c5db199SXin Li    tko_job_idx = _latest_unfilled_job_idx(conn)
185*9c5db199SXin Li    if tko_job_idx is None:
186*9c5db199SXin Li      raise BackfillException('Failed to get last unfilled tko_job_idx')
187*9c5db199SXin Li    logging.info('First tko_job_idx to fill: %s', tko_job_idx)
188*9c5db199SXin Li
189*9c5db199SXin Li  while True:
190*9c5db199SXin Li    logging.info('####################################')
191*9c5db199SXin Li    logging.info('Start backfilling from tko_job_idx: %s', tko_job_idx)
192*9c5db199SXin Li
193*9c5db199SXin Li    task_references = ()
194*9c5db199SXin Li    with _mysql_connection(args) as conn:
195*9c5db199SXin Li      task_references = _compute_task_references(
196*9c5db199SXin Li          conn, tko_job_idx, args.batch_size)
197*9c5db199SXin Li    if not task_references:
198*9c5db199SXin Li      logging.info('No more unfilled task references. All done!')
199*9c5db199SXin Li      break
200*9c5db199SXin Li
201*9c5db199SXin Li    logging.info(
202*9c5db199SXin Li        'Inserting %d task references. tko_job_ids: %d...%d',
203*9c5db199SXin Li        len(task_references),
204*9c5db199SXin Li        task_references[0].tko_job_idx,
205*9c5db199SXin Li        task_references[-1].tko_job_idx,
206*9c5db199SXin Li    )
207*9c5db199SXin Li    with _mysql_connection(args) as conn:
208*9c5db199SXin Li      _insert_task_references(conn, task_references, args.dryrun)
209*9c5db199SXin Li    if not args.dryrun:
210*9c5db199SXin Li      with _mysql_connection(args) as conn:
211*9c5db199SXin Li        _verify_task_references(conn, task_references)
212*9c5db199SXin Li
213*9c5db199SXin Li    tko_job_idx = _next_job_idx(task_references)
214*9c5db199SXin Li
215*9c5db199SXin Li    if args.num_iterations is not None:
216*9c5db199SXin Li      args.num_iterations -= 1
217*9c5db199SXin Li      if args.num_iterations <= 0:
218*9c5db199SXin Li        break
219*9c5db199SXin Li      logging.info('%d more iterations left', args.num_iterations)
220*9c5db199SXin Li    logging.info('Iteration done. Sleeping for %d seconds', args.sleep_seconds)
221*9c5db199SXin Li    time.sleep(args.sleep_seconds)
222*9c5db199SXin Li
223*9c5db199SXin Li
224*9c5db199SXin Liif __name__ == '__main__':
225*9c5db199SXin Li  main()
226