xref: /aosp_15_r20/external/autotest/server/cros/cfm/configurable_test/actions.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1"""
2This module contains the actions that a configurable CFM test can execute.
3"""
4import abc
5import logging
6import random
7import re
8import sys
9import time
10
11class Action(object):
12    """
13    Abstract base class for all actions.
14    """
15    __metaclass__ = abc.ABCMeta
16
17    def __repr__(self):
18        return self.__class__.__name__
19
20    def execute(self, context):
21        """
22        Executes the action.
23
24        @param context ActionContext instance providing dependencies to the
25                action.
26        """
27        logging.info('Executing action "%s"', self)
28        self.do_execute(context)
29        logging.info('Done executing action "%s"', self)
30
31    @abc.abstractmethod
32    def do_execute(self, context):
33        """
34        Performs the actual execution.
35
36        Subclasses must override this method.
37
38        @param context ActionContext instance providing dependencies to the
39                action.
40        """
41        pass
42
43class MuteMicrophone(Action):
44    """
45    Mutes the microphone in a call.
46    """
47    def do_execute(self, context):
48        context.cfm_facade.mute_mic()
49
50class UnmuteMicrophone(Action):
51    """
52    Unmutes the microphone in a call.
53    """
54    def do_execute(self, context):
55        context.cfm_facade.unmute_mic()
56
57class WaitForMeetingsLandingPage(Action):
58    """
59  Wait for landing page to load after reboot.
60  """
61
62    def do_execute(self, context):
63        context.cfm_facade.wait_for_meetings_landing_page()
64
65
66class JoinMeeting(Action):
67    """
68    Joins a meeting.
69    """
70    def __init__(self, meeting_code):
71        """
72        Initializes.
73
74        @param meeting_code The meeting code for the meeting to join.
75        """
76        super(JoinMeeting, self).__init__()
77        self.meeting_code = meeting_code
78
79    def __repr__(self):
80        return 'JoinMeeting "%s"' % self.meeting_code
81
82    def do_execute(self, context):
83        context.cfm_facade.join_meeting_session(self.meeting_code)
84
85class CreateMeeting(Action):
86    """
87    Creates a new meeting from the landing page.
88    """
89    def do_execute(self, context):
90        context.cfm_facade.start_meeting_session()
91
92class LeaveMeeting(Action):
93    """
94    Leaves the current meeting.
95    """
96    def do_execute(self, context):
97        context.cfm_facade.end_meeting_session()
98
99class RebootDut(Action):
100    """
101    Reboots the DUT.
102    """
103    def __init__(self, restart_chrome_for_cfm=False):
104        """Initializes.
105
106        To enable the cfm_facade to interact with the CFM, Chrome needs an extra
107        restart. Setting restart_chrome_for_cfm toggles this extra restart.
108
109        @param restart_chrome_for_cfm If True, restarts chrome to enable
110                the cfm_facade and waits for the telemetry commands to become
111                available. If false, does not do an extra restart of Chrome.
112        """
113        self._restart_chrome_for_cfm = restart_chrome_for_cfm
114
115    def do_execute(self, context):
116        context.host.reboot()
117        if self._restart_chrome_for_cfm:
118            context.cfm_facade.restart_chrome_for_cfm()
119            context.cfm_facade.wait_for_telemetry_commands()
120
121class RepeatTimes(Action):
122    """
123    Repeats a scenario a number of times.
124    """
125    def __init__(self, times, scenario):
126        """
127        Initializes.
128
129        @param times The number of times to repeat the scenario.
130        @param scenario The scenario to repeat.
131        """
132        super(RepeatTimes, self).__init__()
133        self.times = times
134        self.scenario = scenario
135
136    def __str__(self):
137        return 'Repeat[scenario=%s, times=%s]' % (self.scenario, self.times)
138
139    def do_execute(self, context):
140        for _ in range(self.times):
141            self.scenario.execute(context)
142
143class AssertFileDoesNotContain(Action):
144    """
145    Asserts that a file on the DUT does not contain specified regexes.
146    """
147    def __init__(self, path, forbidden_regex_list):
148        """
149        Initializes.
150
151        @param path The file path on the DUT to check.
152        @param forbidden_regex_list a list with regular expressions that should
153                not appear in the file.
154        """
155        super(AssertFileDoesNotContain, self).__init__()
156        self.path = path
157        self.forbidden_regex_list = forbidden_regex_list
158
159    def __repr__(self):
160        return ('AssertFileDoesNotContain[path=%s, forbidden_regex_list=%s'
161                % (self.path, self.forbidden_regex_list))
162
163    def do_execute(self, context):
164        contents = context.file_contents_collector.collect_file_contents(
165                self.path)
166        for forbidden_regex in self.forbidden_regex_list:
167            match = re.search(forbidden_regex, contents)
168            if match:
169                raise AssertionError(
170                        'Regex "%s" matched "%s" in "%s"'
171                        % (forbidden_regex, match.group(), self.path))
172
173class AssertUsbDevices(Action):
174    """
175    Asserts that USB devices with given specs matches a predicate.
176    """
177    def __init__(
178            self,
179            usb_device_specs,
180            predicate=lambda usb_device_list: len(usb_device_list) == 1):
181        """
182        Initializes with a spec to assert and a predicate.
183
184        @param usb_device_specs a list of UsbDeviceSpecs for the devices to
185                check.
186        @param predicate A function that accepts a list of UsbDevices
187                and returns true if the list is as expected or false otherwise.
188                If the method returns false an AssertionError is thrown.
189                The default predicate checks that there is exactly one item
190                in the list.
191        """
192        super(AssertUsbDevices, self).__init__()
193        self._usb_device_specs = usb_device_specs
194        self._predicate = predicate
195
196    def do_execute(self, context):
197        usb_devices = context.usb_device_collector.get_devices_by_spec(
198                *self._usb_device_specs)
199        if not self._predicate(usb_devices):
200            raise AssertionError(
201                    'Assertion failed for usb device specs %s. '
202                    'Usb devices were: %s'
203                    % (self._usb_device_specs, usb_devices))
204
205    def __str__(self):
206        return 'AssertUsbDevices for specs %s' % str(self._usb_device_specs)
207
208class SelectScenarioAtRandom(Action):
209    """
210    Executes a randomly selected scenario a number of times.
211
212    Note that there is no validation performed - you have to take care
213    so that it makes sense to execute the supplied scenarios in any order
214    any number of times.
215    """
216    def __init__(
217            self,
218            scenarios,
219            run_times,
220            random_seed=random.randint(0, sys.maxsize)):
221        """
222        Initializes.
223
224        @param scenarios An iterable with scenarios to choose from.
225        @param run_times The number of scenarios to run. I.e. the number of
226            times a random scenario is selected.
227        @param random_seed The seed to use for the random generator. Providing
228            the same seed as an earlier run will execute the scenarios in the
229            same order. Optional, by default a random seed is used.
230        """
231        super(SelectScenarioAtRandom, self).__init__()
232        self._scenarios = scenarios
233        self._run_times = run_times
234        self._random_seed = random_seed
235        self._random = random.Random(random_seed)
236
237    def do_execute(self, context):
238        for _ in range(self._run_times):
239            self._random.choice(self._scenarios).execute(context)
240
241    def __repr__(self):
242        return ('SelectScenarioAtRandom [seed=%s, run_times=%s, scenarios=%s]'
243                % (self._random_seed, self._run_times, self._scenarios))
244
245
246class PowerCycleUsbPort(Action):
247    """
248    Power cycle USB ports that a specific peripheral type is attached to.
249    """
250    def __init__(
251            self,
252            usb_device_specs,
253            wait_for_change_timeout=10,
254            filter_function=lambda x: x):
255        """
256        Initializes.
257
258        @param usb_device_specs List of UsbDeviceSpecs of the devices to power
259            cycle the port for.
260        @param wait_for_change_timeout The timeout in seconds for waiting
261            for devices to disappeard/appear after turning power off/on.
262            If the devices do not disappear/appear within the timeout an
263            error is raised.
264        @param filter_function Function accepting a list of UsbDevices and
265            returning a list of UsbDevices that should be power cycled. The
266            default is to return the original list, i.e. power cycle all
267            devices matching the usb_device_specs.
268
269        @raises TimeoutError if the devices do not turn off/on within
270            wait_for_change_timeout seconds.
271        """
272        self._usb_device_specs = usb_device_specs
273        self._filter_function = filter_function
274        self._wait_for_change_timeout = wait_for_change_timeout
275
276    def do_execute(self, context):
277        def _get_devices():
278            return context.usb_device_collector.get_devices_by_spec(
279                    *self._usb_device_specs)
280        devices = _get_devices()
281        devices_to_cycle = self._filter_function(devices)
282        # If we are asked to power cycle a device connected to a USB hub (for
283        # example a Mimo which has an internal hub) the devices's bus and port
284        # cannot be used. Those values represent the bus and port of the hub.
285        # Instead we must locate the device that is actually connected to the
286        # physical USB port. This device is the parent at level 1 of the current
287        # device. If the device is not connected to a hub, device.get_parent(1)
288        # will return the device itself.
289        devices_to_cycle = [device.get_parent(1) for device in devices_to_cycle]
290        logging.debug('Power cycling devices: %s', devices_to_cycle)
291        port_ids = [(d.bus, d.port) for d in devices_to_cycle]
292        context.usb_port_manager.set_port_power(port_ids, False)
293        # TODO(kerl): We should do a better check than counting devices.
294        # Possibly implementing __eq__() in UsbDevice and doing a proper
295        # intersection to see which devices are running or not.
296        expected_devices_after_power_off = len(devices) - len(devices_to_cycle)
297        _wait_for_condition(
298                lambda: len(_get_devices()) == expected_devices_after_power_off,
299                self._wait_for_change_timeout)
300        context.usb_port_manager.set_port_power(port_ids, True)
301        _wait_for_condition(
302                lambda: len(_get_devices()) == len(devices),
303                self._wait_for_change_timeout)
304
305    def __repr__(self):
306        return ('PowerCycleUsbPort[usb_device_specs=%s, '
307                'wait_for_change_timeout=%s]'
308                % (str(self._usb_device_specs), self._wait_for_change_timeout))
309
310
311class Sleep(Action):
312    """
313    Action that sleeps for a number of seconds.
314    """
315    def __init__(self, num_seconds):
316        """
317        Initializes.
318
319        @param num_seconds The number of seconds to sleep.
320        """
321        self._num_seconds = num_seconds
322
323    def do_execute(self, context):
324        time.sleep(self._num_seconds)
325
326    def __repr__(self):
327        return 'Sleep[num_seconds=%s]' % self._num_seconds
328
329
330class RetryAssertAction(Action):
331    """
332    Action that retries an assertion action a number of times if it fails.
333
334    An example use case for this action is to verify that a peripheral device
335    appears after power cycling. E.g.:
336        PowerCycleUsbPort(ATRUS),
337        RetryAssertAction(AssertUsbDevices(ATRUS), 10)
338    """
339    def __init__(self, action, num_tries, retry_delay_seconds=1):
340        """
341        Initializes.
342
343        @param action The action to execute.
344        @param num_tries The number of times to try the action before failing
345            for real. Must be more than 0.
346        @param retry_delay_seconds The number of seconds to sleep between
347            retries.
348
349        @raises ValueError if num_tries is below 1.
350        """
351        super(RetryAssertAction, self).__init__()
352        if num_tries < 1:
353            raise ValueError('num_tries must be > 0. Was %s' % num_tries)
354        self._action = action
355        self._num_tries = num_tries
356        self._retry_delay_seconds = retry_delay_seconds
357
358    def do_execute(self, context):
359        for attempt in range(self._num_tries):
360            try:
361                self._action.execute(context)
362                return
363            except AssertionError as e:
364                if attempt == self._num_tries - 1:
365                    raise e
366                else:
367                    logging.info(
368                            'Action %s failed, will retry %d more times',
369                             self._action,
370                             self._num_tries - attempt - 1,
371                             exc_info=True)
372                    time.sleep(self._retry_delay_seconds)
373
374    def __repr__(self):
375        return ('RetryAssertAction[action=%s, '
376                'num_tries=%s, retry_delay_seconds=%s]'
377                % (self._action, self._num_tries, self._retry_delay_seconds))
378
379
380class AssertNoNewCrashes(Action):
381    """
382    Asserts that no new crash files exist on disk.
383    """
384    def do_execute(self, context):
385        new_crash_files = context.crash_detector.get_new_crash_files()
386        if new_crash_files:
387            raise AssertionError(
388                    'New crash files detected: %s' % str(new_crash_files))
389
390
391class TimeoutError(RuntimeError):
392    """
393    Error raised when an operation times out.
394    """
395    pass
396
397
398def _wait_for_condition(condition, timeout_seconds=10):
399    """
400    Wait for a condition to become true.
401
402    Checks the condition every second.
403
404    @param condition The condition to check - a function returning a boolean.
405    @param timeout_seconds The timeout in seconds.
406
407    @raises TimeoutError in case the condition does not become true within
408        the timeout.
409    """
410    if condition():
411        return
412    for _ in range(timeout_seconds):
413        time.sleep(1)
414        if condition():
415            return
416    raise TimeoutError('Timeout after %s seconds waiting for condition %s'
417                       % (timeout_seconds, condition))
418
419
420class StartPerfMetricsCollection(Action):
421    """
422    Starts collecting performance data.
423
424    Collection is performed in a background thread so this operation returns
425    immediately.
426
427    This action only collects the data, it does not upload it.
428    Use UploadPerfMetrics to upload the data to the perf dashboard.
429    """
430    def do_execute(self, context):
431        context.perf_metrics_collector.start()
432
433
434class StopPerfMetricsCollection(Action):
435    """
436    Stops collecting performance data.
437
438    This action only stops collecting the data, it does not upload it.
439    Use UploadPerfMetrics to upload the data to the perf dashboard.
440    """
441    def do_execute(self, context):
442        context.perf_metrics_collector.stop()
443
444
445class UploadPerfMetrics(Action):
446    """
447    Uploads the collected perf metrics to the perf dashboard.
448    """
449    def do_execute(self, context):
450        context.perf_metrics_collector.upload_metrics()
451
452
453class CreateMeetingWithBots(Action):
454    """
455    Creates a new meeting prepopulated with bots.
456
457    Call JoinMeetingWithBots() do join it with a CfM.
458    """
459    def __init__(self, bot_count, bots_ttl_min, muted=True):
460        """
461        Initializes.
462
463        @param bot_count Amount of bots to be in the meeting.
464        @param bots_ttl_min TTL in minutes after which the bots leave.
465        @param muted If the bots are audio muted or not.
466        """
467        super(CreateMeetingWithBots, self).__init__()
468        self._bot_count = bot_count
469        # Adds an extra 30 seconds buffer
470        self._bots_ttl_sec = bots_ttl_min * 60 + 30
471        self._muted = muted
472
473    def __repr__(self):
474        return (
475            'CreateMeetingWithBots:\n'
476            ' bot_count: %d\n'
477            ' bots_ttl_sec: %d\n'
478            ' muted: %s' % (self._bot_count, self._bots_ttl_sec, self._muted)
479        )
480
481    def do_execute(self, context):
482        if context.bots_meeting_code:
483            raise AssertionError(
484                'A meeting with bots is already running. '
485                'Repeated calls to CreateMeetingWithBots() are not supported.')
486        context.bots_meeting_code = context.bond_api.CreateConference()
487        context.bond_api.AddBotsRequest(
488            context.bots_meeting_code,
489            self._bot_count,
490            self._bots_ttl_sec);
491        mute_cmd = 'mute_audio' if self._muted else 'unmute_audio'
492        context.bond_api.ExecuteScript('@all %s' % mute_cmd,
493                                       context.bots_meeting_code)
494
495
496class JoinMeetingWithBots(Action):
497    """
498    Joins an existing meeting started via CreateMeetingWithBots().
499    """
500    def do_execute(self, context):
501        meeting_code = context.bots_meeting_code
502        if not meeting_code:
503            raise AssertionError(
504                'Meeting with bots was not started. '
505                'Did you forget to call CreateMeetingWithBots()?')
506        context.cfm_facade.join_meeting_session(context.bots_meeting_code)
507