xref: /aosp_15_r20/external/pigweed/pw_emu/py/tests/cli_test.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python
2# Copyright 2023 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests for the command line interface"""
16
17import os
18import signal
19import subprocess
20import sys
21import time
22import unittest
23
24from pathlib import Path
25
26from mock_emu_frontend import _mock_emu
27from config_helper import ConfigHelper
28
29
30# TODO: b/301382004 - The Python Pigweed package install (into python-venv)
31# races with running this test and there is no way to add that package as a test
32# depedency without creating circular depedencies. This means we can't rely on
33# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper.
34#
35# Run the CLI directly instead of going through pw cli.
36_cli_path = Path(
37    os.path.join(os.environ['PW_ROOT'], 'pw_emu', 'py', 'pw_emu', '__main__.py')
38).resolve()
39
40
41class TestCli(ConfigHelper):
42    """Test non-interactive commands"""
43
44    _config = {
45        'emulators': {
46            'mock-emu': {
47                'launcher': 'mock_emu_frontend.MockEmuLauncher',
48                'connector': 'mock_emu_frontend.MockEmuConnector',
49            }
50        },
51        'mock-emu': {
52            'tcp_channel': True,
53            'gdb_channel': True,
54        },
55        'gdb': _mock_emu + ['--exit', '--'],
56        'targets': {'test-target': {'mock-emu': {}}},
57    }
58
59    def _build_cmd(self, args: list[str]) -> list[str]:
60        cmd = [
61            'python',
62            str(_cli_path),
63            '--working-dir',
64            self._wdir.name,
65            '--config',
66            self._config_file,
67        ] + args
68        return cmd
69
70    def _run(self, args: list[str], **kwargs) -> subprocess.CompletedProcess:
71        """Run the CLI and wait for completion"""
72        return subprocess.run(self._build_cmd(args), **kwargs)
73
74    def _popen(self, args: list[str], **kwargs) -> subprocess.Popen:
75        """Run the CLI in the background"""
76        return subprocess.Popen(self._build_cmd(args), **kwargs)
77
78
79class TestNonInteractive(TestCli):
80    """Test non interactive commands."""
81
82    def setUp(self) -> None:
83        super().setUp()
84        self.assertEqual(self._run(['start', 'test-target']).returncode, 0)
85
86    def tearDown(self) -> None:
87        self.assertEqual(self._run(['stop']).returncode, 0)
88        super().tearDown()
89
90    def test_already_running(self) -> None:
91        self.assertNotEqual(self._run(['start', 'test-target']).returncode, 0)
92
93    def test_gdb_cmds(self) -> None:
94        status = self._run(
95            ['gdb-cmds', 'show version'],
96        )
97        self.assertEqual(status.returncode, 0)
98
99    def test_prop_ls(self) -> None:
100        status = self._run(['prop-ls', 'path1'], stdout=subprocess.PIPE)
101        self.assertEqual(status.returncode, 0)
102        self.assertTrue('prop1' in status.stdout.decode('ascii'))
103        status = self._run(['prop-ls', 'invalid path'], stdout=subprocess.PIPE)
104        self.assertNotEqual(status.returncode, 0)
105
106    def test_prop_get(self) -> None:
107        status = self._run(
108            ['prop-get', 'invalid path', 'prop1'],
109            stdout=subprocess.PIPE,
110        )
111        self.assertNotEqual(status.returncode, 0)
112        status = self._run(
113            ['prop-get', 'path1', 'invalid prop'],
114            stdout=subprocess.PIPE,
115        )
116        self.assertNotEqual(status.returncode, 0)
117        status = self._run(
118            ['prop-get', 'path1', 'prop1'],
119            stdout=subprocess.PIPE,
120        )
121        self.assertEqual(status.returncode, 0)
122        self.assertTrue('val1' in status.stdout.decode('ascii'))
123
124    def test_prop_set(self) -> None:
125        status = self._run(
126            ['prop-set', 'invalid path', 'prop1', 'v'],
127            stdout=subprocess.PIPE,
128        )
129        self.assertNotEqual(status.returncode, 0)
130        status = self._run(
131            ['prop-set', 'path1', 'invalid prop', 'v'],
132            stdout=subprocess.PIPE,
133        )
134        self.assertNotEqual(status.returncode, 0)
135        status = self._run(
136            ['prop-set', 'path1', 'prop1', 'value'],
137            stdout=subprocess.PIPE,
138        )
139        self.assertEqual(status.returncode, 0)
140        status = self._run(
141            ['prop-get', 'path1', 'prop1'],
142            stdout=subprocess.PIPE,
143        )
144        self.assertEqual(status.returncode, 0)
145        self.assertTrue('value' in status.stdout.decode('ascii'), status.stdout)
146
147    def test_reset(self) -> None:
148        self.assertEqual(self._run(['reset']).returncode, 0)
149        self.assertTrue(os.path.exists(os.path.join(self._wdir.name, 'reset')))
150
151    def test_load(self) -> None:
152        self.assertEqual(self._run(['load', 'executable']).returncode, 0)
153
154    def test_resume(self) -> None:
155        self.assertEqual(self._run(['resume']).returncode, 0)
156
157
158class TestForeground(TestCli):
159    """Test starting in foreground"""
160
161    def _test_common(self, cmd) -> None:
162        # Run the CLI process in a new session so that we can terminate both the
163        # CLI and the mock emulator it spawns in the foreground.
164        args = {}
165        if sys.platform == 'win32':
166            args['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
167        else:
168            args['start_new_session'] = True
169        proc = self._popen(cmd, stdout=subprocess.PIPE, **args)
170        assert proc.stdout
171        output = proc.stdout.readline()
172        self.assertTrue(
173            'starting mock emulator' in output.decode('utf-8'),
174            output.decode('utf-8'),
175        )
176        if sys.platform == 'win32':
177            # See https://bugs.python.org/issue26350
178            os.kill(proc.pid, signal.CTRL_BREAK_EVENT)
179        else:
180            os.kill(-proc.pid, signal.SIGTERM)
181        proc.wait()
182        proc.stdout.close()
183
184    def test_foreground(self) -> None:
185        self._test_common(['start', '--foreground', 'test-target'])
186
187    def test_debug(self) -> None:
188        self._test_common(['start', '--debug', 'test-target'])
189
190
191class TestInteractive(TestCli):
192    """Test interactive commands"""
193
194    def setUp(self) -> None:
195        super().setUp()
196        self.assertEqual(self._run(['start', 'test-target']).returncode, 0)
197
198    def tearDown(self) -> None:
199        self.assertEqual(self._run(['stop']).returncode, 0)
200        super().tearDown()
201
202    @staticmethod
203    def _read_nonblocking(fd: int, size: int) -> bytes:
204        try:
205            return os.read(fd, size)
206        except BlockingIOError:
207            return b''
208
209    def test_term(self) -> None:
210        """Test the pw emu term command"""
211
212        if sys.platform == 'win32':
213            self.skipTest('pty not supported on win32')
214
215        # pylint: disable=import-outside-toplevel
216        # Can't import pty on win32.
217        import pty
218
219        # pylint: disable=no-member
220        # Avoid pylint false positive on win32.
221        pid, fd = pty.fork()
222        if pid == 0:
223            status = self._run(['term', 'tcp'])
224            # pylint: disable=protected-access
225            # Use os._exit instead of os.exit after fork.
226            os._exit(status.returncode)
227        else:
228            expected = '--- Miniterm on tcp ---'
229
230            # Read the expected string with a timeout.
231            os.set_blocking(fd, False)
232            deadline = time.monotonic() + 5
233            data = self._read_nonblocking(fd, len(expected))
234            while len(data) < len(expected):
235                time.sleep(0.1)
236                data += self._read_nonblocking(fd, len(expected) - len(data))
237                if time.monotonic() > deadline:
238                    break
239            self.assertTrue(
240                expected in data.decode('ascii'),
241                data + self._read_nonblocking(fd, 100),
242            )
243
244            # send CTRL + ']' to terminate miniterm
245            os.write(fd, b'\x1d')
246
247            # wait for the process to exit, with a timeout
248            deadline = time.monotonic() + 5
249            wait_pid, ret = os.waitpid(pid, os.WNOHANG)
250            while wait_pid == 0:
251                time.sleep(0.1)
252                # Discard input to avoid writer hang on MacOS,
253                # see https://github.com/python/cpython/issues/97001.
254                try:
255                    self._read_nonblocking(fd, 100)
256                except OSError:
257                    # Avoid read errors when the child pair of the pty
258                    # closes when the child terminates.
259                    pass
260                wait_pid, ret = os.waitpid(pid, os.WNOHANG)
261                if time.monotonic() > deadline:
262                    break
263            self.assertEqual(wait_pid, pid)
264            self.assertEqual(ret, 0)
265
266    def test_gdb(self) -> None:
267        res = self._run(['gdb', '-e', 'executable'], stdout=subprocess.PIPE)
268        self.assertEqual(res.returncode, 0)
269        output = res.stdout.decode('ascii')
270        self.assertTrue('target remote' in output, output)
271        self.assertTrue('executable' in output, output)
272
273
274if __name__ == '__main__':
275    unittest.main()
276