1# 2# Copyright 2008 Google Inc. All Rights Reserved. 3 4""" 5The job module contains the objects and methods used to 6manage jobs in Autotest. 7 8The valid actions are: 9list: lists job(s) 10create: create a job 11abort: abort job(s) 12stat: detailed listing of job(s) 13 14The common options are: 15 16See topic_common.py for a High Level Design and Algorithm. 17""" 18 19# pylint: disable=missing-docstring 20 21from __future__ import print_function 22 23import getpass, re 24from autotest_lib.cli import topic_common, action_common 25from autotest_lib.client.common_lib import control_data 26from autotest_lib.client.common_lib import priorities 27 28 29class job(topic_common.atest): 30 """Job class 31 atest job [create|clone|list|stat|abort] <options>""" 32 usage_action = '[create|clone|list|stat|abort]' 33 topic = msg_topic = 'job' 34 msg_items = '<job_ids>' 35 36 37 def _convert_status(self, results): 38 for result in results: 39 total = sum(result['status_counts'].values()) 40 status = ['%s=%s(%.1f%%)' % (key, val, 100.0*float(val)/total) 41 for key, val in result['status_counts'].iteritems()] 42 status.sort() 43 result['status_counts'] = ', '.join(status) 44 45 46 def backward_compatibility(self, action, argv): 47 """ 'job create --clone' became 'job clone --id' """ 48 if action == 'create': 49 for option in ['-l', '--clone']: 50 if option in argv: 51 argv[argv.index(option)] = '--id' 52 action = 'clone' 53 return action 54 55 56class job_help(job): 57 """Just here to get the atest logic working. 58 Usage is set by its parent""" 59 pass 60 61 62class job_list_stat(action_common.atest_list, job): 63 def __init__(self): 64 super(job_list_stat, self).__init__() 65 66 self.topic_parse_info = topic_common.item_parse_info( 67 attribute_name='jobs', 68 use_leftover=True) 69 70 71 def __split_jobs_between_ids_names(self): 72 job_ids = [] 73 job_names = [] 74 75 # Sort between job IDs and names 76 for job_id in self.jobs: 77 if job_id.isdigit(): 78 job_ids.append(job_id) 79 else: 80 job_names.append(job_id) 81 return (job_ids, job_names) 82 83 84 def execute_on_ids_and_names(self, op, filters={}, 85 check_results={'id__in': 'id', 86 'name__in': 'id'}, 87 tag_id='id__in', tag_name='name__in'): 88 if not self.jobs: 89 # Want everything 90 return super(job_list_stat, self).execute(op=op, filters=filters) 91 92 all_jobs = [] 93 (job_ids, job_names) = self.__split_jobs_between_ids_names() 94 95 for items, tag in [(job_ids, tag_id), 96 (job_names, tag_name)]: 97 if items: 98 new_filters = filters.copy() 99 new_filters[tag] = items 100 jobs = super(job_list_stat, 101 self).execute(op=op, 102 filters=new_filters, 103 check_results=check_results) 104 all_jobs.extend(jobs) 105 106 return all_jobs 107 108 109class job_list(job_list_stat): 110 """atest job list [<jobs>] [--all] [--running] [--user <username>]""" 111 def __init__(self): 112 super(job_list, self).__init__() 113 self.parser.add_option('-a', '--all', help='List jobs for all ' 114 'users.', action='store_true', default=False) 115 self.parser.add_option('-r', '--running', help='List only running ' 116 'jobs', action='store_true') 117 self.parser.add_option('-u', '--user', help='List jobs for given ' 118 'user', type='string') 119 120 121 def parse(self): 122 options, leftover = super(job_list, self).parse() 123 self.all = options.all 124 self.data['running'] = options.running 125 if options.user: 126 if options.all: 127 self.invalid_syntax('Only specify --all or --user, not both.') 128 else: 129 self.data['owner'] = options.user 130 elif not options.all and not self.jobs: 131 self.data['owner'] = getpass.getuser() 132 133 return options, leftover 134 135 136 def execute(self): 137 return self.execute_on_ids_and_names(op='get_jobs_summary', 138 filters=self.data) 139 140 141 def output(self, results): 142 keys = ['id', 'owner', 'name', 'status_counts'] 143 if self.verbose: 144 keys.extend(['priority', 'control_type', 'created_on']) 145 self._convert_status(results) 146 super(job_list, self).output(results, keys) 147 148 149 150class job_stat(job_list_stat): 151 """atest job stat <job>""" 152 usage_action = 'stat' 153 154 def __init__(self): 155 super(job_stat, self).__init__() 156 self.parser.add_option('-f', '--control-file', 157 help='Display the control file', 158 action='store_true', default=False) 159 self.parser.add_option('-N', '--list-hosts', 160 help='Display only a list of hosts', 161 action='store_true') 162 self.parser.add_option('-s', '--list-hosts-status', 163 help='Display only the hosts in these statuses ' 164 'for a job.', action='store') 165 166 167 def parse(self): 168 status_list = topic_common.item_parse_info( 169 attribute_name='status_list', 170 inline_option='list_hosts_status') 171 options, leftover = super(job_stat, self).parse([status_list], 172 req_items='jobs') 173 174 if not self.jobs: 175 self.invalid_syntax('Must specify at least one job.') 176 177 self.show_control_file = options.control_file 178 self.list_hosts = options.list_hosts 179 180 if self.list_hosts and self.status_list: 181 self.invalid_syntax('--list-hosts is implicit when using ' 182 '--list-hosts-status.') 183 if len(self.jobs) > 1 and (self.list_hosts or self.status_list): 184 self.invalid_syntax('--list-hosts and --list-hosts-status should ' 185 'only be used on a single job.') 186 187 return options, leftover 188 189 190 def _merge_results(self, summary, qes): 191 hosts_status = {} 192 for qe in qes: 193 if qe['host']: 194 job_id = qe['job']['id'] 195 hostname = qe['host']['hostname'] 196 hosts_status.setdefault(job_id, 197 {}).setdefault(qe['status'], 198 []).append(hostname) 199 200 for job in summary: 201 job_id = job['id'] 202 if hosts_status.has_key(job_id): 203 this_job = hosts_status[job_id] 204 job['hosts'] = ' '.join(' '.join(host) for host in 205 this_job.itervalues()) 206 host_per_status = ['%s="%s"' %(status, ' '.join(host)) 207 for status, host in this_job.iteritems()] 208 job['hosts_status'] = ', '.join(host_per_status) 209 if self.status_list: 210 statuses = set(s.lower() for s in self.status_list) 211 all_hosts = [s for s in host_per_status if s.split('=', 212 1)[0].lower() in statuses] 213 job['hosts_selected_status'] = '\n'.join(all_hosts) 214 else: 215 job['hosts_status'] = '' 216 217 if not job.get('hosts'): 218 self.generic_error('Job has unassigned meta-hosts, ' 219 'try again shortly.') 220 221 return summary 222 223 224 def execute(self): 225 summary = self.execute_on_ids_and_names(op='get_jobs_summary') 226 227 # Get the real hostnames 228 qes = self.execute_on_ids_and_names(op='get_host_queue_entries', 229 check_results={}, 230 tag_id='job__in', 231 tag_name='job__name__in') 232 233 self._convert_status(summary) 234 235 return self._merge_results(summary, qes) 236 237 238 def output(self, results): 239 if self.list_hosts: 240 keys = ['hosts'] 241 elif self.status_list: 242 keys = ['hosts_selected_status'] 243 elif not self.verbose: 244 keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status'] 245 else: 246 keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status', 247 'owner', 'control_type', 'synch_count', 'created_on', 248 'run_verify', 'reboot_before', 'reboot_after', 249 'parse_failed_repair'] 250 251 if self.show_control_file: 252 keys.append('control_file') 253 254 super(job_stat, self).output(results, keys) 255 256 257class job_create_or_clone(action_common.atest_create, job): 258 """Class containing the code common to the job create and clone actions""" 259 msg_items = 'job_name' 260 261 def __init__(self): 262 super(job_create_or_clone, self).__init__() 263 self.hosts = [] 264 self.data_item_key = 'name' 265 self.parser.add_option('-p', '--priority', 266 help='Job priority (int)', type='int', 267 default=priorities.Priority.DEFAULT) 268 self.parser.add_option('-b', '--labels', 269 help='Comma separated list of labels ' 270 'to get machine list from.', default='') 271 self.parser.add_option('-m', '--machine', help='List of machines to ' 272 'run on') 273 self.parser.add_option('-M', '--mlist', 274 help='File listing machines to use', 275 type='string', metavar='MACHINE_FLIST') 276 self.parser.add_option('--one-time-hosts', 277 help='List of one time hosts') 278 self.parser.add_option('-e', '--email', 279 help='A comma seperated list of ' 280 'email addresses to notify of job completion', 281 default='') 282 283 284 def _parse_hosts(self, args): 285 """ Parses the arguments to generate a list of hosts and meta_hosts 286 A host is a regular name, a meta_host is n*label or *label. 287 These can be mixed on the CLI, and separated by either commas or 288 spaces, e.g.: 5*Machine_Label host0 5*Machine_Label2,host2 """ 289 290 hosts = [] 291 meta_hosts = [] 292 293 for arg in args: 294 for host in arg.split(','): 295 if re.match('^[0-9]+[*]', host): 296 num, host = host.split('*', 1) 297 meta_hosts += int(num) * [host] 298 elif re.match('^[*](\w*)', host): 299 meta_hosts += [re.match('^[*](\w*)', host).group(1)] 300 elif host != '' and host not in hosts: 301 # Real hostname and not a duplicate 302 hosts.append(host) 303 304 return (hosts, meta_hosts) 305 306 307 def parse(self, parse_info=[]): 308 host_info = topic_common.item_parse_info(attribute_name='hosts', 309 inline_option='machine', 310 filename_option='mlist') 311 job_info = topic_common.item_parse_info(attribute_name='jobname', 312 use_leftover=True) 313 oth_info = topic_common.item_parse_info(attribute_name='one_time_hosts', 314 inline_option='one_time_hosts') 315 label_info = topic_common.item_parse_info(attribute_name='labels', 316 inline_option='labels') 317 318 options, leftover = super(job_create_or_clone, self).parse( 319 [host_info, job_info, oth_info, label_info] + parse_info, 320 req_items='jobname') 321 self.data = { 322 'priority': options.priority, 323 } 324 jobname = getattr(self, 'jobname') 325 if len(jobname) > 1: 326 self.invalid_syntax('Too many arguments specified, only expected ' 327 'to receive job name: %s' % jobname) 328 self.jobname = jobname[0] 329 330 if self.one_time_hosts: 331 self.data['one_time_hosts'] = self.one_time_hosts 332 333 if self.labels: 334 label_hosts = self.execute_rpc(op='get_hosts', 335 multiple_labels=self.labels) 336 for host in label_hosts: 337 self.hosts.append(host['hostname']) 338 339 self.data['name'] = self.jobname 340 341 (self.data['hosts'], 342 self.data['meta_hosts']) = self._parse_hosts(self.hosts) 343 344 self.data['email_list'] = options.email 345 346 return options, leftover 347 348 349 def create_job(self): 350 job_id = self.execute_rpc(op='create_job', **self.data) 351 return ['%s (id %s)' % (self.jobname, job_id)] 352 353 354 def get_items(self): 355 return [self.jobname] 356 357 358 359class job_create(job_create_or_clone): 360 """atest job create [--priority <int>] 361 [--synch_count] [--control-file </path/to/cfile>] 362 [--on-server] [--test <test1,test2>] 363 [--mlist </path/to/machinelist>] [--machine <host1 host2 host3>] 364 [--labels <list of labels of machines to run on>] 365 [--reboot_before <option>] [--reboot_after <option>] 366 [--noverify] [--timeout <timeout>] [--max_runtime <max runtime>] 367 [--one-time-hosts <hosts>] [--email <email>] 368 [--dependencies <labels this job is dependent on>] 369 [--parse-failed-repair <option>] 370 [--image <http://path/to/image>] [--require-ssp] 371 job_name 372 373 Creating a job is rather different from the other create operations, 374 so it only uses the __init__() and output() from its superclass. 375 """ 376 op_action = 'create' 377 378 def __init__(self): 379 super(job_create, self).__init__() 380 self.ctrl_file_data = {} 381 self.parser.add_option('-y', '--synch_count', type=int, 382 help='Number of machines to use per autoserv ' 383 'execution') 384 self.parser.add_option('-f', '--control-file', 385 help='use this control file', metavar='FILE') 386 self.parser.add_option('-s', '--server', 387 help='This is server-side job', 388 action='store_true', default=False) 389 self.parser.add_option('-t', '--test', 390 help='List of tests to run') 391 392 self.parser.add_option('-d', '--dependencies', help='Comma separated ' 393 'list of labels this job is dependent on.', 394 default='') 395 396 self.parser.add_option('-B', '--reboot_before', 397 help='Whether or not to reboot the machine ' 398 'before the job (never/if dirty/always)', 399 type='choice', 400 choices=('never', 'if dirty', 'always')) 401 self.parser.add_option('-a', '--reboot_after', 402 help='Whether or not to reboot the machine ' 403 'after the job (never/if all tests passed/' 404 'always)', 405 type='choice', 406 choices=('never', 'if all tests passed', 407 'always')) 408 409 self.parser.add_option('--parse-failed-repair', 410 help='Whether or not to parse failed repair ' 411 'results as part of the job', 412 type='choice', 413 choices=('true', 'false')) 414 self.parser.add_option('-n', '--noverify', 415 help='Do not run verify for job', 416 default=False, action='store_true') 417 self.parser.add_option('-o', '--timeout_mins', 418 help='Job timeout in minutes.', 419 metavar='TIMEOUT') 420 self.parser.add_option('--max_runtime', 421 help='Job maximum runtime in minutes') 422 423 self.parser.add_option('-i', '--image', 424 help='OS image to install before running the ' 425 'test.') 426 self.parser.add_option('--require-ssp', 427 help='Require server-side packaging', 428 default=False, action='store_true') 429 430 431 def parse(self): 432 deps_info = topic_common.item_parse_info(attribute_name='dependencies', 433 inline_option='dependencies') 434 options, leftover = super(job_create, self).parse( 435 parse_info=[deps_info]) 436 437 if (len(self.hosts) == 0 and not self.one_time_hosts 438 and not options.labels): 439 self.invalid_syntax('Must specify at least one machine.' 440 '(-m, -M, -b or --one-time-hosts).') 441 if not options.control_file and not options.test: 442 self.invalid_syntax('Must specify either --test or --control-file' 443 ' to create a job.') 444 if options.control_file and options.test: 445 self.invalid_syntax('Can only specify one of --control-file or ' 446 '--test, not both.') 447 if options.control_file: 448 try: 449 control_file_f = open(options.control_file) 450 try: 451 control_file_data = control_file_f.read() 452 finally: 453 control_file_f.close() 454 except IOError: 455 self.generic_error('Unable to read from specified ' 456 'control-file: %s' % options.control_file) 457 self.data['control_file'] = control_file_data 458 if options.test: 459 if options.server: 460 self.invalid_syntax('If you specify tests, then the ' 461 'client/server setting is implicit and ' 462 'cannot be overriden.') 463 tests = [t.strip() for t in options.test.split(',') if t.strip()] 464 self.ctrl_file_data['tests'] = tests 465 466 if options.image: 467 self.data['image'] = options.image 468 469 if options.reboot_before: 470 self.data['reboot_before'] = options.reboot_before.capitalize() 471 if options.reboot_after: 472 self.data['reboot_after'] = options.reboot_after.capitalize() 473 if options.parse_failed_repair: 474 self.data['parse_failed_repair'] = ( 475 options.parse_failed_repair == 'true') 476 if options.noverify: 477 self.data['run_verify'] = False 478 if options.timeout_mins: 479 self.data['timeout_mins'] = options.timeout_mins 480 if options.max_runtime: 481 self.data['max_runtime_mins'] = options.max_runtime 482 483 self.data['dependencies'] = self.dependencies 484 485 if options.synch_count: 486 self.data['synch_count'] = options.synch_count 487 if options.server: 488 self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER 489 else: 490 self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT 491 492 self.data['require_ssp'] = options.require_ssp 493 494 return options, leftover 495 496 497 def execute(self): 498 if self.ctrl_file_data: 499 cf_info = self.execute_rpc(op='generate_control_file', 500 item=self.jobname, 501 **self.ctrl_file_data) 502 503 self.data['control_file'] = cf_info['control_file'] 504 if 'synch_count' not in self.data: 505 self.data['synch_count'] = cf_info['synch_count'] 506 if cf_info['is_server']: 507 self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER 508 else: 509 self.data['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT 510 511 # Get the union of the 2 sets of dependencies 512 deps = set(self.data['dependencies']) 513 deps = sorted(deps.union(cf_info['dependencies'])) 514 self.data['dependencies'] = list(deps) 515 516 if 'synch_count' not in self.data: 517 self.data['synch_count'] = 1 518 519 return self.create_job() 520 521 522class job_clone(job_create_or_clone): 523 """atest job clone [--priority <int>] 524 [--mlist </path/to/machinelist>] [--machine <host1 host2 host3>] 525 [--labels <list of labels of machines to run on>] 526 [--one-time-hosts <hosts>] [--email <email>] 527 job_name 528 529 Cloning a job is rather different from the other create operations, 530 so it only uses the __init__() and output() from its superclass. 531 """ 532 op_action = 'clone' 533 usage_action = 'clone' 534 535 def __init__(self): 536 super(job_clone, self).__init__() 537 self.parser.add_option('-i', '--id', help='Job id to clone', 538 default=False, 539 metavar='JOB_ID') 540 self.parser.add_option('-r', '--reuse-hosts', 541 help='Use the exact same hosts as the ' 542 'cloned job.', 543 action='store_true', default=False) 544 545 546 def parse(self): 547 options, leftover = super(job_clone, self).parse() 548 549 self.clone_id = options.id 550 self.reuse_hosts = options.reuse_hosts 551 552 host_specified = self.hosts or self.one_time_hosts or options.labels 553 if self.reuse_hosts and host_specified: 554 self.invalid_syntax('Cannot specify hosts and reuse the same ' 555 'ones as the cloned job.') 556 557 if not (self.reuse_hosts or host_specified): 558 self.invalid_syntax('Must reuse or specify at least one ' 559 'machine (-r, -m, -M, -b or ' 560 '--one-time-hosts).') 561 562 return options, leftover 563 564 565 def execute(self): 566 clone_info = self.execute_rpc(op='get_info_for_clone', 567 id=self.clone_id, 568 preserve_metahosts=self.reuse_hosts) 569 570 # Remove fields from clone data that cannot be reused 571 for field in ('name', 'created_on', 'id', 'owner'): 572 del clone_info['job'][field] 573 574 # Also remove parameterized_job field, as the feature still is 575 # incomplete, this tool does not attempt to support it for now, 576 # it uses a different API function and it breaks create_job() 577 if clone_info['job'].has_key('parameterized_job'): 578 del clone_info['job']['parameterized_job'] 579 580 # Keyword args cannot be unicode strings 581 self.data.update((str(key), val) 582 for key, val in clone_info['job'].iteritems()) 583 584 if self.reuse_hosts: 585 # Convert host list from clone info that can be used for job_create 586 for label, qty in clone_info['meta_host_counts'].iteritems(): 587 self.data['meta_hosts'].extend([label]*qty) 588 589 self.data['hosts'].extend(host['hostname'] 590 for host in clone_info['hosts']) 591 592 return self.create_job() 593 594 595class job_abort(job, action_common.atest_delete): 596 """atest job abort <job(s)>""" 597 usage_action = op_action = 'abort' 598 msg_done = 'Aborted' 599 600 def parse(self): 601 job_info = topic_common.item_parse_info(attribute_name='jobids', 602 use_leftover=True) 603 options, leftover = super(job_abort, self).parse([job_info], 604 req_items='jobids') 605 606 607 def execute(self): 608 data = {'job__id__in': self.jobids} 609 self.execute_rpc(op='abort_host_queue_entries', **data) 610 print('Aborting jobs: %s' % ', '.join(self.jobids)) 611 612 613 def get_items(self): 614 return self.jobids 615