xref: /aosp_15_r20/external/cronet/testing/trigger_scripts/perf_device_trigger.py (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1#!/usr/bin/env python3
2# Copyright 2017 The Chromium Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Custom swarming triggering script.
6
7This script does custom swarming triggering logic, to enable device affinity
8for our bots, while lumping all trigger calls under one logical step.
9
10For the perf use case of device affinity, this script now enables soft device
11affinity.  This means that it tries to smartly allocate jobs to bots based
12on what is currently alive and what bot the task was last triggered on,
13preferring that last triggered bot if available.  If the
14--multiple-trigger-configs flag is specified than this script overrides
15the soft device affinity functionality in favor of the provided ids.
16
17The algorithm is roughly the following:
18
19Find eligible bots, healthy or not.
20  * Query swarming for eligible bots based on the dimensions passed in
21    on the swarming call.  Determine their health status based on
22    is not quarantied and is not is_dead
23
24Of the eligible bots determine what bot id to run the shard on.
25(Implementation in _select_config_indices_with_soft_affinity)
26  * First query swarming for the last task that ran that shard with
27    given dimensions.  Assuming they are returned with most recent first.
28  * Check if the bot id that ran that task is alive, if so trigger
29    on that bot again.
30  * If that bot isn't alive, allocate to another alive bot or if no
31    other alive bots exist, trigger on the same dead one.
32
33Scripts inheriting must have roughly the same command line interface as
34swarming.py trigger. It modifies it in the following ways:
35
36 * Intercepts the dump-json argument, and creates its own by combining the
37   results from each trigger call.
38 * Intercepts the dimensions from the swarming call and determines what bots
39   are healthy based on the above device affinity algorithm, and triggers
40 * Adds a tag to the swarming trigger job with the shard so we know the last
41   bot that ran this shard.
42
43This script is normally called from the swarming recipe module in tools/build.
44
45"""
46
47from __future__ import print_function
48
49import argparse
50import copy
51import os
52import sys
53import logging
54import random
55
56import base_test_triggerer
57import six
58
59SRC_DIR = os.path.dirname(
60    os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
61sys.path.append(os.path.join(SRC_DIR, 'tools', 'perf'))
62
63import generate_perf_sharding
64from core import bot_platforms
65
66
67class Bot(object):  # pylint: disable=useless-object-inheritance
68    """Eligible bots to run the task."""
69
70    def __init__(self, bot_id, is_alive):
71        self._bot_id = bot_id
72        self._is_alive = is_alive
73
74    def id(self):
75        return self._bot_id
76
77    def is_alive(self):
78        return self._is_alive
79
80    def as_json_config(self):
81        return {'id': self._bot_id}
82
83
84class PerfDeviceTriggerer(base_test_triggerer.BaseTestTriggerer):
85    def __init__(self, args, swarming_args):
86        # pylint: disable=super-with-arguments
87        super(PerfDeviceTriggerer, self).__init__()
88        # pylint: enable=super-with-arguments
89        self._sharded_query_failed = False
90
91        if not args.multiple_trigger_configs:
92            # Represents the list of current dimensions requested
93            # by the parent swarming job.
94            self._dimensions = self._get_swarming_dimensions(swarming_args)
95
96            # Store what swarming server we need and whether or not we need
97            # to send down authentication with it
98            self._swarming_server = self._get_swarming_server(swarming_args)
99
100            # Map of all existing bots in swarming that satisfy the current
101            # set of dimensions indexed by bot id.
102            # Note: this assumes perf bot dimensions are unique between
103            # configurations.
104            self._eligible_bots_by_ids = (
105                self._query_swarming_for_eligible_bot_configs(
106                    self._dimensions))
107
108        if args.multiple_dimension_script_verbose:
109            logging.basicConfig(level=logging.DEBUG)
110
111    def generate_shard_map(self, args, buildername, selected_config):
112        shard_map = None
113        num_of_shards = len(selected_config)
114        builder = bot_platforms.find_bot_platform(buildername)
115        if args.use_dynamic_shards and builder and num_of_shards:
116            logging.info(
117                'Generating dynamic shardmap for builder: %s with %d shards',
118                buildername, num_of_shards)
119            shard_map = generate_perf_sharding.GenerateShardMap(
120                builder=builder, num_of_shards=num_of_shards)
121            for shard_index, bot_index in selected_config:
122                bot_id = self._bot_configs[bot_index]['id']
123                shard_map['extra_infos']['bot #%s' % shard_index] = bot_id
124        return shard_map
125
126    def append_additional_args(self, args, shard_index):
127        # Append a tag to the swarming task with the shard number
128        # so we can query for the last bot that ran a specific shard.
129        tag = 'shard:%d' % shard_index
130        shard_tag = ['--tag', tag]
131        # Need to append this before the dash if present so it gets fed to
132        # the swarming task itself.
133        if '--' in args:
134            dash_ind = args.index('--')
135            return args[:dash_ind] + shard_tag + args[dash_ind:]
136        return args + shard_tag
137
138    def parse_bot_configs(self, args):
139        if args.multiple_trigger_configs:
140            # pylint: disable=super-with-arguments
141            super(PerfDeviceTriggerer, self).parse_bot_configs(args)
142            # pylint: enable=super-with-arguments
143        else:
144            self._bot_configs = []
145            # For each eligible bot, append the dimension
146            # to the eligible bot_configs
147            for _, bot in self._eligible_bots_by_ids.items():
148                self._bot_configs.append(bot.as_json_config())
149
150    def select_config_indices(self, args):
151        if args.multiple_trigger_configs:
152            configs = []
153            # If specific bot ids were passed in, we want to trigger a job for
154            # every valid config regardless of health status since
155            # each config represents exactly one bot in the perf swarming pool.
156            for index in range(len(self.indices_to_trigger(args))):
157                configs.append((index, index))
158        if args.use_dynamic_shards:
159            return self._select_config_indices_with_dynamic_sharding()
160        return self._select_config_indices_with_soft_affinity(args)
161
162    def _select_config_indices_with_dynamic_sharding(self):
163        alive_bot_ids = [
164            bot_id for bot_id, b in self._eligible_bots_by_ids.items()
165            if b.is_alive()
166        ]
167        trigger_count = len(alive_bot_ids)
168
169        indexes = list(range(trigger_count))
170        random.shuffle(indexes)
171        selected_config = [(indexes[i],
172                            self._find_bot_config_index(alive_bot_ids[i]))
173                           for i in range(trigger_count)]
174        selected_config.sort()
175
176        for shard_index, bot_index in selected_config:
177            logging.info('Shard %d\n\tBot: %s', shard_index,
178                         self._bot_configs[bot_index]['id'])
179
180        return selected_config
181
182    def _select_config_indices_with_soft_affinity(self, args):
183        trigger_count = len(self.indices_to_trigger(args))
184        # First make sure the number of shards doesn't exceed the
185        # number of eligible bots. This means there is a config error somewhere.
186        if trigger_count > len(self._eligible_bots_by_ids):
187            self._print_device_affinity_info({}, {},
188                                             self._eligible_bots_by_ids,
189                                             trigger_count)
190            raise ValueError(
191                'Not enough available machines exist in swarming '
192                'pool.  Shards requested (%d) exceeds available bots '
193                '(%d).' % (trigger_count, len(self._eligible_bots_by_ids)))
194
195        shard_to_bot_assignment_map = {}
196        unallocated_bots_by_ids = copy.deepcopy(self._eligible_bots_by_ids)
197        for shard_index in self.indices_to_trigger(args):
198            bot_id = self._query_swarming_for_last_shard_id(shard_index)
199            if bot_id and bot_id in unallocated_bots_by_ids:
200                bot = unallocated_bots_by_ids[bot_id]
201                shard_to_bot_assignment_map[shard_index] = bot
202                unallocated_bots_by_ids.pop(bot_id)
203            else:
204                shard_to_bot_assignment_map[shard_index] = None
205
206        # Maintain the current map for debugging purposes
207        existing_shard_bot_to_shard_map = copy.deepcopy(
208            shard_to_bot_assignment_map)
209        # Now create sets of remaining healthy and bad bots
210        unallocated_healthy_bots = {
211            b
212            for b in unallocated_bots_by_ids.values() if b.is_alive()
213        }
214        unallocated_bad_bots = {
215            b
216            for b in unallocated_bots_by_ids.values() if not b.is_alive()
217        }
218
219        # Try assigning healthy bots for new shards first.
220        for shard_index, bot in sorted(
221                shard_to_bot_assignment_map.items()):
222            if not bot and unallocated_healthy_bots:
223                shard_to_bot_assignment_map[shard_index] = \
224                    unallocated_healthy_bots.pop()
225                logging.info('First time shard %d has been triggered',
226                             shard_index)
227            elif not bot:
228                shard_to_bot_assignment_map[
229                    shard_index] = unallocated_bad_bots.pop()
230
231        # Handle the rest of shards that were assigned dead bots:
232        for shard_index, bot in sorted(
233                shard_to_bot_assignment_map.items()):
234            if not bot.is_alive() and unallocated_healthy_bots:
235                dead_bot = bot
236                healthy_bot = unallocated_healthy_bots.pop()
237                shard_to_bot_assignment_map[shard_index] = healthy_bot
238                logging.info(
239                    'Device affinity broken for shard #%d. bot %s is dead,'
240                    ' new mapping to bot %s', shard_index, dead_bot.id(),
241                    healthy_bot.id())
242
243        # Now populate the indices into the bot_configs array
244        selected_configs = []
245        for shard_index in self.indices_to_trigger(args):
246            selected_configs.append(
247                (shard_index,
248                 self._find_bot_config_index(
249                     shard_to_bot_assignment_map[shard_index].id())))
250        self._print_device_affinity_info(shard_to_bot_assignment_map,
251                                         existing_shard_bot_to_shard_map,
252                                         self._eligible_bots_by_ids,
253                                         trigger_count)
254        return selected_configs
255
256    def _print_device_affinity_info(self, new_map, existing_map, health_map,
257                                    num_shards):
258        logging.info('')
259        for shard_index in range(num_shards):
260            existing = existing_map.get(shard_index, None)
261            new = new_map.get(shard_index, None)
262            existing_id = ''
263            if existing:
264                existing_id = existing.id()
265            new_id = ''
266            if new:
267                new_id = new.id()
268            logging.info('Shard %d\n\tprevious: %s\n\tnew: %s', shard_index,
269                         existing_id, new_id)
270
271        healthy_bots = []
272        dead_bots = []
273        for _, b in health_map.items():
274            if b.is_alive():
275                healthy_bots.append(b.id())
276            else:
277                dead_bots.append(b.id())
278        logging.info('Shards needed: %d', num_shards)
279        logging.info('Total bots (dead + healthy): %d',
280                     len(dead_bots) + len(healthy_bots))
281        logging.info('Healthy bots, %d: %s', len(healthy_bots), healthy_bots)
282        logging.info('Dead Bots, %d: %s', len(dead_bots), dead_bots)
283        logging.info('')
284
285    def _query_swarming_for_eligible_bot_configs(self, dimensions):
286        """Query Swarming to figure out which bots are available.
287
288        Returns: a dictionary in which the keys are the bot id and
289        the values are Bot object that indicate the health status
290        of the bots.
291        """
292
293        query_result = self.list_bots(dimensions,
294                                      server=self._swarming_server)
295        perf_bots = {}
296        for bot in query_result:
297            # Device maintenance is usually quick, and we can wait for it to
298            # finish. However, if the device is too hot, it can take a long time
299            # for it to cool down, so check for 'Device temperature' in
300            # maintenance_msg.
301            alive = (not bot.get('is_dead') and not bot.get('quarantined')
302                     and 'Device temperature' not in bot.get(
303                         'maintenance_msg', ''))
304            perf_bots[bot['bot_id']] = Bot(bot['bot_id'], alive)
305        return perf_bots
306
307    def _find_bot_config_index(self, bot_id):
308        # Find the index into the bot_config map that
309        # maps to the bot id in question
310        for i, dimensions in enumerate(self._bot_configs):
311            if dimensions['id'] == bot_id:
312                return i
313        return None
314
315    def _query_swarming_for_last_shard_id(self, shard_index):
316        """Per shard, query swarming for the last bot that ran the task.
317
318        Example: swarming.py query -S server-url.com --limit 1 \\
319          'tasks/list?tags=os:Windows&tags=pool:chrome.tests.perf&tags=shard:12'
320        """
321        values = ['%s:%s' % (k, v) for k, v in self._dimensions.items()]
322        values.sort()
323
324        # Append the shard as a tag
325        values_with_shard = list(values)
326        values_with_shard.append('%s:%s' % ('shard', str(shard_index)))
327        values_with_shard.sort()
328
329        # TODO(eyaich): For now we are ignoring the state of the returned
330        # task (ie completed, timed_out, bot_died, etc) as we are just
331        # answering the question "What bot did we last trigger this shard on?"
332        # Evaluate if this is the right decision going forward.
333
334        # Query for the last task that ran with these dimensions and this shard.
335        # Query with the shard param first. This will sometimes time out for
336        # queries we've never done before, so try querying without it if that
337        # happens.
338        try:
339            if not self._sharded_query_failed:
340                tasks = self.list_tasks(values_with_shard,
341                                        limit='1',
342                                        server=self._swarming_server)
343        except Exception:
344            self._sharded_query_failed = True
345        if self._sharded_query_failed:
346            tasks = self.list_tasks(values,
347                                    limit='1',
348                                    server=self._swarming_server)
349
350        if tasks:
351            # We queried with a limit of 1 so we could only get back
352            # the most recent which is what we care about.
353            task = tasks[0]
354            if 'bot_id' in task:
355                return task['bot_id']
356            for tag in task['tags']:
357                if tag.startswith('id:'):
358                    return tag[len('id:'):]
359        # No eligible shard for this bot
360        return None
361
362    def _get_swarming_dimensions(self, args):
363        dimensions = {}
364        for i in range(len(args) - 2):
365            if args[i] == '--dimension':
366                dimensions[args[i + 1]] = args[i + 2]
367        return dimensions
368
369    # pylint: disable=inconsistent-return-statements
370    def _get_swarming_server(self, args):
371        for i in range(len(args)):
372            if '--swarming' in args[i]:
373                server = args[i + 1]
374                slashes_index = server.index('//') + 2
375                # Strip out the protocol
376                return server[slashes_index:]
377    # pylint: enable=inconsistent-return-statements
378
379
380def main():
381    logging.basicConfig(level=logging.INFO,
382                        format='(%(levelname)s) %(asctime)s pid=%(process)d'
383                        '  %(module)s.%(funcName)s:%(lineno)d  %(message)s')
384    # Setup args for common contract of base class
385    parser = base_test_triggerer.BaseTestTriggerer.setup_parser_contract(
386        argparse.ArgumentParser(description=__doc__))
387    parser.add_argument(
388        '--use-dynamic-shards',
389        action='store_true',
390        required=False,
391        help='Ignore --shards and the existing shard map. Will '
392        'generate a shard map at run time and use as much '
393        'device as possible.')
394    args, remaining = parser.parse_known_args()
395
396    triggerer = PerfDeviceTriggerer(args, remaining)
397    return triggerer.trigger_tasks(args, remaining)
398
399
400if __name__ == '__main__':
401    sys.exit(main())
402