1#!/usr/bin/env python3 2# Copyright 2017 The Chromium Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Custom swarming triggering script. 6 7This script does custom swarming triggering logic, to enable device affinity 8for our bots, while lumping all trigger calls under one logical step. 9 10For the perf use case of device affinity, this script now enables soft device 11affinity. This means that it tries to smartly allocate jobs to bots based 12on what is currently alive and what bot the task was last triggered on, 13preferring that last triggered bot if available. If the 14--multiple-trigger-configs flag is specified than this script overrides 15the soft device affinity functionality in favor of the provided ids. 16 17The algorithm is roughly the following: 18 19Find eligible bots, healthy or not. 20 * Query swarming for eligible bots based on the dimensions passed in 21 on the swarming call. Determine their health status based on 22 is not quarantied and is not is_dead 23 24Of the eligible bots determine what bot id to run the shard on. 25(Implementation in _select_config_indices_with_soft_affinity) 26 * First query swarming for the last task that ran that shard with 27 given dimensions. Assuming they are returned with most recent first. 28 * Check if the bot id that ran that task is alive, if so trigger 29 on that bot again. 30 * If that bot isn't alive, allocate to another alive bot or if no 31 other alive bots exist, trigger on the same dead one. 32 33Scripts inheriting must have roughly the same command line interface as 34swarming.py trigger. It modifies it in the following ways: 35 36 * Intercepts the dump-json argument, and creates its own by combining the 37 results from each trigger call. 38 * Intercepts the dimensions from the swarming call and determines what bots 39 are healthy based on the above device affinity algorithm, and triggers 40 * Adds a tag to the swarming trigger job with the shard so we know the last 41 bot that ran this shard. 42 43This script is normally called from the swarming recipe module in tools/build. 44 45""" 46 47from __future__ import print_function 48 49import argparse 50import copy 51import os 52import sys 53import logging 54import random 55 56import base_test_triggerer 57import six 58 59SRC_DIR = os.path.dirname( 60 os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 61sys.path.append(os.path.join(SRC_DIR, 'tools', 'perf')) 62 63import generate_perf_sharding 64from core import bot_platforms 65 66 67class Bot(object): # pylint: disable=useless-object-inheritance 68 """Eligible bots to run the task.""" 69 70 def __init__(self, bot_id, is_alive): 71 self._bot_id = bot_id 72 self._is_alive = is_alive 73 74 def id(self): 75 return self._bot_id 76 77 def is_alive(self): 78 return self._is_alive 79 80 def as_json_config(self): 81 return {'id': self._bot_id} 82 83 84class PerfDeviceTriggerer(base_test_triggerer.BaseTestTriggerer): 85 def __init__(self, args, swarming_args): 86 # pylint: disable=super-with-arguments 87 super(PerfDeviceTriggerer, self).__init__() 88 # pylint: enable=super-with-arguments 89 self._sharded_query_failed = False 90 91 if not args.multiple_trigger_configs: 92 # Represents the list of current dimensions requested 93 # by the parent swarming job. 94 self._dimensions = self._get_swarming_dimensions(swarming_args) 95 96 # Store what swarming server we need and whether or not we need 97 # to send down authentication with it 98 self._swarming_server = self._get_swarming_server(swarming_args) 99 100 # Map of all existing bots in swarming that satisfy the current 101 # set of dimensions indexed by bot id. 102 # Note: this assumes perf bot dimensions are unique between 103 # configurations. 104 self._eligible_bots_by_ids = ( 105 self._query_swarming_for_eligible_bot_configs( 106 self._dimensions)) 107 108 if args.multiple_dimension_script_verbose: 109 logging.basicConfig(level=logging.DEBUG) 110 111 def generate_shard_map(self, args, buildername, selected_config): 112 shard_map = None 113 num_of_shards = len(selected_config) 114 builder = bot_platforms.find_bot_platform(buildername) 115 if args.use_dynamic_shards and builder and num_of_shards: 116 logging.info( 117 'Generating dynamic shardmap for builder: %s with %d shards', 118 buildername, num_of_shards) 119 shard_map = generate_perf_sharding.GenerateShardMap( 120 builder=builder, num_of_shards=num_of_shards) 121 for shard_index, bot_index in selected_config: 122 bot_id = self._bot_configs[bot_index]['id'] 123 shard_map['extra_infos']['bot #%s' % shard_index] = bot_id 124 return shard_map 125 126 def append_additional_args(self, args, shard_index): 127 # Append a tag to the swarming task with the shard number 128 # so we can query for the last bot that ran a specific shard. 129 tag = 'shard:%d' % shard_index 130 shard_tag = ['--tag', tag] 131 # Need to append this before the dash if present so it gets fed to 132 # the swarming task itself. 133 if '--' in args: 134 dash_ind = args.index('--') 135 return args[:dash_ind] + shard_tag + args[dash_ind:] 136 return args + shard_tag 137 138 def parse_bot_configs(self, args): 139 if args.multiple_trigger_configs: 140 # pylint: disable=super-with-arguments 141 super(PerfDeviceTriggerer, self).parse_bot_configs(args) 142 # pylint: enable=super-with-arguments 143 else: 144 self._bot_configs = [] 145 # For each eligible bot, append the dimension 146 # to the eligible bot_configs 147 for _, bot in self._eligible_bots_by_ids.items(): 148 self._bot_configs.append(bot.as_json_config()) 149 150 def select_config_indices(self, args): 151 if args.multiple_trigger_configs: 152 configs = [] 153 # If specific bot ids were passed in, we want to trigger a job for 154 # every valid config regardless of health status since 155 # each config represents exactly one bot in the perf swarming pool. 156 for index in range(len(self.indices_to_trigger(args))): 157 configs.append((index, index)) 158 if args.use_dynamic_shards: 159 return self._select_config_indices_with_dynamic_sharding() 160 return self._select_config_indices_with_soft_affinity(args) 161 162 def _select_config_indices_with_dynamic_sharding(self): 163 alive_bot_ids = [ 164 bot_id for bot_id, b in self._eligible_bots_by_ids.items() 165 if b.is_alive() 166 ] 167 trigger_count = len(alive_bot_ids) 168 169 indexes = list(range(trigger_count)) 170 random.shuffle(indexes) 171 selected_config = [(indexes[i], 172 self._find_bot_config_index(alive_bot_ids[i])) 173 for i in range(trigger_count)] 174 selected_config.sort() 175 176 for shard_index, bot_index in selected_config: 177 logging.info('Shard %d\n\tBot: %s', shard_index, 178 self._bot_configs[bot_index]['id']) 179 180 return selected_config 181 182 def _select_config_indices_with_soft_affinity(self, args): 183 trigger_count = len(self.indices_to_trigger(args)) 184 # First make sure the number of shards doesn't exceed the 185 # number of eligible bots. This means there is a config error somewhere. 186 if trigger_count > len(self._eligible_bots_by_ids): 187 self._print_device_affinity_info({}, {}, 188 self._eligible_bots_by_ids, 189 trigger_count) 190 raise ValueError( 191 'Not enough available machines exist in swarming ' 192 'pool. Shards requested (%d) exceeds available bots ' 193 '(%d).' % (trigger_count, len(self._eligible_bots_by_ids))) 194 195 shard_to_bot_assignment_map = {} 196 unallocated_bots_by_ids = copy.deepcopy(self._eligible_bots_by_ids) 197 for shard_index in self.indices_to_trigger(args): 198 bot_id = self._query_swarming_for_last_shard_id(shard_index) 199 if bot_id and bot_id in unallocated_bots_by_ids: 200 bot = unallocated_bots_by_ids[bot_id] 201 shard_to_bot_assignment_map[shard_index] = bot 202 unallocated_bots_by_ids.pop(bot_id) 203 else: 204 shard_to_bot_assignment_map[shard_index] = None 205 206 # Maintain the current map for debugging purposes 207 existing_shard_bot_to_shard_map = copy.deepcopy( 208 shard_to_bot_assignment_map) 209 # Now create sets of remaining healthy and bad bots 210 unallocated_healthy_bots = { 211 b 212 for b in unallocated_bots_by_ids.values() if b.is_alive() 213 } 214 unallocated_bad_bots = { 215 b 216 for b in unallocated_bots_by_ids.values() if not b.is_alive() 217 } 218 219 # Try assigning healthy bots for new shards first. 220 for shard_index, bot in sorted( 221 shard_to_bot_assignment_map.items()): 222 if not bot and unallocated_healthy_bots: 223 shard_to_bot_assignment_map[shard_index] = \ 224 unallocated_healthy_bots.pop() 225 logging.info('First time shard %d has been triggered', 226 shard_index) 227 elif not bot: 228 shard_to_bot_assignment_map[ 229 shard_index] = unallocated_bad_bots.pop() 230 231 # Handle the rest of shards that were assigned dead bots: 232 for shard_index, bot in sorted( 233 shard_to_bot_assignment_map.items()): 234 if not bot.is_alive() and unallocated_healthy_bots: 235 dead_bot = bot 236 healthy_bot = unallocated_healthy_bots.pop() 237 shard_to_bot_assignment_map[shard_index] = healthy_bot 238 logging.info( 239 'Device affinity broken for shard #%d. bot %s is dead,' 240 ' new mapping to bot %s', shard_index, dead_bot.id(), 241 healthy_bot.id()) 242 243 # Now populate the indices into the bot_configs array 244 selected_configs = [] 245 for shard_index in self.indices_to_trigger(args): 246 selected_configs.append( 247 (shard_index, 248 self._find_bot_config_index( 249 shard_to_bot_assignment_map[shard_index].id()))) 250 self._print_device_affinity_info(shard_to_bot_assignment_map, 251 existing_shard_bot_to_shard_map, 252 self._eligible_bots_by_ids, 253 trigger_count) 254 return selected_configs 255 256 def _print_device_affinity_info(self, new_map, existing_map, health_map, 257 num_shards): 258 logging.info('') 259 for shard_index in range(num_shards): 260 existing = existing_map.get(shard_index, None) 261 new = new_map.get(shard_index, None) 262 existing_id = '' 263 if existing: 264 existing_id = existing.id() 265 new_id = '' 266 if new: 267 new_id = new.id() 268 logging.info('Shard %d\n\tprevious: %s\n\tnew: %s', shard_index, 269 existing_id, new_id) 270 271 healthy_bots = [] 272 dead_bots = [] 273 for _, b in health_map.items(): 274 if b.is_alive(): 275 healthy_bots.append(b.id()) 276 else: 277 dead_bots.append(b.id()) 278 logging.info('Shards needed: %d', num_shards) 279 logging.info('Total bots (dead + healthy): %d', 280 len(dead_bots) + len(healthy_bots)) 281 logging.info('Healthy bots, %d: %s', len(healthy_bots), healthy_bots) 282 logging.info('Dead Bots, %d: %s', len(dead_bots), dead_bots) 283 logging.info('') 284 285 def _query_swarming_for_eligible_bot_configs(self, dimensions): 286 """Query Swarming to figure out which bots are available. 287 288 Returns: a dictionary in which the keys are the bot id and 289 the values are Bot object that indicate the health status 290 of the bots. 291 """ 292 293 query_result = self.list_bots(dimensions, 294 server=self._swarming_server) 295 perf_bots = {} 296 for bot in query_result: 297 # Device maintenance is usually quick, and we can wait for it to 298 # finish. However, if the device is too hot, it can take a long time 299 # for it to cool down, so check for 'Device temperature' in 300 # maintenance_msg. 301 alive = (not bot.get('is_dead') and not bot.get('quarantined') 302 and 'Device temperature' not in bot.get( 303 'maintenance_msg', '')) 304 perf_bots[bot['bot_id']] = Bot(bot['bot_id'], alive) 305 return perf_bots 306 307 def _find_bot_config_index(self, bot_id): 308 # Find the index into the bot_config map that 309 # maps to the bot id in question 310 for i, dimensions in enumerate(self._bot_configs): 311 if dimensions['id'] == bot_id: 312 return i 313 return None 314 315 def _query_swarming_for_last_shard_id(self, shard_index): 316 """Per shard, query swarming for the last bot that ran the task. 317 318 Example: swarming.py query -S server-url.com --limit 1 \\ 319 'tasks/list?tags=os:Windows&tags=pool:chrome.tests.perf&tags=shard:12' 320 """ 321 values = ['%s:%s' % (k, v) for k, v in self._dimensions.items()] 322 values.sort() 323 324 # Append the shard as a tag 325 values_with_shard = list(values) 326 values_with_shard.append('%s:%s' % ('shard', str(shard_index))) 327 values_with_shard.sort() 328 329 # TODO(eyaich): For now we are ignoring the state of the returned 330 # task (ie completed, timed_out, bot_died, etc) as we are just 331 # answering the question "What bot did we last trigger this shard on?" 332 # Evaluate if this is the right decision going forward. 333 334 # Query for the last task that ran with these dimensions and this shard. 335 # Query with the shard param first. This will sometimes time out for 336 # queries we've never done before, so try querying without it if that 337 # happens. 338 try: 339 if not self._sharded_query_failed: 340 tasks = self.list_tasks(values_with_shard, 341 limit='1', 342 server=self._swarming_server) 343 except Exception: 344 self._sharded_query_failed = True 345 if self._sharded_query_failed: 346 tasks = self.list_tasks(values, 347 limit='1', 348 server=self._swarming_server) 349 350 if tasks: 351 # We queried with a limit of 1 so we could only get back 352 # the most recent which is what we care about. 353 task = tasks[0] 354 if 'bot_id' in task: 355 return task['bot_id'] 356 for tag in task['tags']: 357 if tag.startswith('id:'): 358 return tag[len('id:'):] 359 # No eligible shard for this bot 360 return None 361 362 def _get_swarming_dimensions(self, args): 363 dimensions = {} 364 for i in range(len(args) - 2): 365 if args[i] == '--dimension': 366 dimensions[args[i + 1]] = args[i + 2] 367 return dimensions 368 369 # pylint: disable=inconsistent-return-statements 370 def _get_swarming_server(self, args): 371 for i in range(len(args)): 372 if '--swarming' in args[i]: 373 server = args[i + 1] 374 slashes_index = server.index('//') + 2 375 # Strip out the protocol 376 return server[slashes_index:] 377 # pylint: enable=inconsistent-return-statements 378 379 380def main(): 381 logging.basicConfig(level=logging.INFO, 382 format='(%(levelname)s) %(asctime)s pid=%(process)d' 383 ' %(module)s.%(funcName)s:%(lineno)d %(message)s') 384 # Setup args for common contract of base class 385 parser = base_test_triggerer.BaseTestTriggerer.setup_parser_contract( 386 argparse.ArgumentParser(description=__doc__)) 387 parser.add_argument( 388 '--use-dynamic-shards', 389 action='store_true', 390 required=False, 391 help='Ignore --shards and the existing shard map. Will ' 392 'generate a shard map at run time and use as much ' 393 'device as possible.') 394 args, remaining = parser.parse_known_args() 395 396 triggerer = PerfDeviceTriggerer(args, remaining) 397 return triggerer.trigger_tasks(args, remaining) 398 399 400if __name__ == '__main__': 401 sys.exit(main()) 402