xref: /aosp_15_r20/external/autotest/client/cros/audio/cras_dbus_utils.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright 2015 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""This module provides cras DBus audio utilities."""
7
8import logging
9import multiprocessing
10import pprint
11
12# AU tests use ToT client code, but ToT -3 client version.
13try:
14    from gi.repository import GObject
15except ImportError:
16    import gobject as GObject
17
18from autotest_lib.client.cros.audio import cras_utils
19
20
21def _set_default_main_loop():
22    """Sets the gobject main loop to be the event loop for DBus.
23
24    @raises: ImportError if dbus.mainloop.glib can not be imported.
25
26    """
27    try:
28        import dbus.mainloop.glib
29    except ImportError as e:
30        logging.exception(
31                'Can not import dbus.mainloop.glib: %s. '
32                'This method should only be called on Cros device.', e)
33        raise
34    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
35
36class CrasDBusMonitorError(Exception):
37    """Error in CrasDBusMonitor."""
38    pass
39
40
41class CrasDBusMonitor(object):
42    """Monitor for DBus signal from Cras."""
43    def __init__(self):
44        _set_default_main_loop()
45        # Acquires a new Cras interface through a new dbus.SystemBus instance
46        # which has default main loop.
47        self._iface = cras_utils.get_cras_control_interface(private=True)
48        self._loop = GObject.MainLoop()
49        self._count = 0
50
51
52class CrasDBusSignalListener(CrasDBusMonitor):
53    """Listener for certain signal."""
54    def __init__(self):
55        super(CrasDBusSignalListener, self).__init__()
56        self._target_signal_count = 0
57
58
59    def wait_for_nodes_changed(self, target_signal_count, timeout_secs):
60        """Waits for NodesChanged signal.
61
62        @param target_signal_count: The expected number of signal.
63        @param timeout_secs: The timeout in seconds.
64
65        @raises: CrasDBusMonitorError if there is no enough signals before
66                 timeout.
67
68        """
69        self._target_signal_count = target_signal_count
70        signal_match = self._iface.connect_to_signal(
71                'NodesChanged', self._nodes_changed_handler)
72        GObject.timeout_add(
73                timeout_secs * 1000, self._timeout_quit_main_loop)
74
75        # Blocks here until _nodes_changed_handler or _timeout_quit_main_loop
76        # quits the loop.
77        self._loop.run()
78
79        signal_match.remove()
80        if self._count < self._target_signal_count:
81            raise CrasDBusMonitorError('Timeout')
82
83
84    def _nodes_changed_handler(self):
85        """Handler for NodesChanged signal."""
86        if self._loop.is_running():
87            logging.debug('Got NodesChanged signal when loop is running.')
88            self._count = self._count + 1
89            logging.debug('count = %d', self._count)
90            if self._count >= self._target_signal_count:
91                logging.debug('Quit main loop')
92                self._loop.quit()
93        else:
94            logging.debug('Got NodesChanged signal when loop is not running.'
95                          ' Ignore it')
96
97
98    def _timeout_quit_main_loop(self):
99        """Handler for timeout in main loop.
100
101        @returns: False so this callback will not be called again.
102
103        """
104        if self._loop.is_running():
105            logging.error('Quit main loop because of timeout')
106            self._loop.quit()
107        else:
108            logging.debug(
109                    'Got _quit_main_loop after main loop quits. Ignore it')
110
111        return False
112
113
114class CrasDBusBackgroundSignalCounter(object):
115    """Controls signal counter which runs in background."""
116    def __init__(self):
117        self._proc = None
118        self._signal_name = None
119        self._counter = None
120        self._parent_conn = None
121        self._child_conn = None
122
123
124    def start(self, signal_name):
125        """Starts the signal counter in a subprocess.
126
127        @param signal_name: The name of the signal to count.
128
129        """
130        self._signal_name = signal_name
131        self._parent_conn, self._child_conn = multiprocessing.Pipe()
132        self._proc = multiprocessing.Process(
133                target=self._run, args=(self._child_conn,))
134        self._proc.daemon = True
135        self._proc.start()
136
137
138    def _run(self, child_conn):
139        """Runs CrasDBusCounter.
140
141        This should be called in a subprocess.
142        This blocks until parent_conn send stop command to the pipe.
143
144        """
145        self._counter = CrasDBusCounter(self._signal_name, child_conn)
146        self._counter.run()
147
148
149    def stop(self):
150        """Stops the CrasDBusCounter by sending stop command to parent_conn.
151
152        The result of CrasDBusCounter in its subproces can be obtained by
153        reading from parent_conn.
154
155        @returns: The count of the signal of interest.
156
157        """
158        self._parent_conn.send(CrasDBusCounter.STOP_CMD)
159        return self._parent_conn.recv()
160
161
162class CrasDBusCounter(CrasDBusMonitor):
163    """Counter for DBus signal sent from Cras"""
164
165    _CHECK_QUIT_PERIOD_SECS = 0.1
166    STOP_CMD = 'stop'
167
168    def __init__(self, signal_name, child_conn, ignore_redundant=True):
169        """Initializes a CrasDBusCounter.
170
171        @param signal_name: The name of the signal of interest.
172        @param child_conn: A multiprocessing.Pipe which is used to receive stop
173                     signal and to send the counting result.
174        @param ignore_redundant: Ignores signal if GetNodes result stays the
175                     same. This happens when there is change in unplugged nodes,
176                     which does not affect Cras client.
177
178        """
179        super(CrasDBusCounter, self).__init__()
180        self._signal_name = signal_name
181        self._count = None
182        self._child_conn = child_conn
183        self._ignore_redundant = ignore_redundant
184        self._nodes = None
185
186
187    def run(self):
188        """Runs the gobject main loop and listens for the signal."""
189        self._count = 0
190
191        self._nodes = cras_utils.get_cras_nodes()
192        logging.debug('Before starting the counter')
193        logging.debug('nodes = %s', pprint.pformat(self._nodes))
194
195        signal_match = self._iface.connect_to_signal(
196                self._signal_name, self._signal_handler)
197        GObject.timeout_add(
198                 int(self._CHECK_QUIT_PERIOD_SECS * 1000),
199                 self._check_quit_main_loop)
200
201        logging.debug('Start counting for signal %s', self._signal_name)
202
203        # Blocks here until _check_quit_main_loop quits the loop.
204        self._loop.run()
205
206        signal_match.remove()
207
208        logging.debug('Count result: %s', self._count)
209        self._child_conn.send(self._count)
210
211
212    def _signal_handler(self):
213        """Handler for signal."""
214        if self._loop.is_running():
215            logging.debug('Got %s signal when loop is running.',
216                          self._signal_name)
217
218            logging.debug('Getting nodes.')
219            nodes = cras_utils.get_cras_nodes()
220            logging.debug('nodes = %s', pprint.pformat(nodes))
221            if self._ignore_redundant and self._nodes == nodes:
222                logging.debug('Nodes did not change. Ignore redundant signal')
223                return
224
225            self._count = self._count + 1
226            logging.debug('count = %d', self._count)
227        else:
228            logging.debug('Got %s signal when loop is not running.'
229                          ' Ignore it', self._signal_name)
230
231
232    def _should_stop(self):
233        """Checks if user wants to stop main loop."""
234        if self._child_conn.poll():
235            if self._child_conn.recv() == self.STOP_CMD:
236                logging.debug('Should stop')
237                return True
238        return False
239
240
241    def _check_quit_main_loop(self):
242        """Handler for timeout in main loop.
243
244        @returns: True so this callback will not be called again.
245                  False if user quits main loop.
246
247        """
248        if self._loop.is_running():
249            logging.debug('main loop is running in _check_quit_main_loop')
250            if self._should_stop():
251                logging.debug('Quit main loop because of stop command')
252                self._loop.quit()
253                return False
254            else:
255                logging.debug('No stop command, keep running')
256                return True
257        else:
258            logging.debug(
259                    'Got _quit_main_loop after main loop quits. Ignore it')
260
261            return False
262
263
264class CrasDBusMonitorUnexpectedNodesChanged(Exception):
265    """Error for unexpected nodes changed."""
266    pass
267
268
269def wait_for_unexpected_nodes_changed(timeout_secs):
270    """Waits for unexpected nodes changed signal in this blocking call.
271
272    @param timeout_secs: Timeout in seconds for waiting.
273
274    @raises CrasDBusMonitorUnexpectedNodesChanged if there is NodesChanged
275            signal
276
277    """
278    try:
279        CrasDBusSignalListener().wait_for_nodes_changed(1, timeout_secs)
280    except CrasDBusMonitorError:
281        logging.debug('There is no NodesChanged signal, as expected')
282        return
283    raise CrasDBusMonitorUnexpectedNodesChanged()
284