1#!/usr/bin/env python 2# Copyright 2023 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Tests for the command line interface""" 16 17import os 18import signal 19import subprocess 20import sys 21import time 22import unittest 23 24from pathlib import Path 25 26from mock_emu_frontend import _mock_emu 27from config_helper import ConfigHelper 28 29 30# TODO: b/301382004 - The Python Pigweed package install (into python-venv) 31# races with running this test and there is no way to add that package as a test 32# depedency without creating circular depedencies. This means we can't rely on 33# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper. 34# 35# Run the CLI directly instead of going through pw cli. 36_cli_path = Path( 37 os.path.join(os.environ['PW_ROOT'], 'pw_emu', 'py', 'pw_emu', '__main__.py') 38).resolve() 39 40 41class TestCli(ConfigHelper): 42 """Test non-interactive commands""" 43 44 _config = { 45 'emulators': { 46 'mock-emu': { 47 'launcher': 'mock_emu_frontend.MockEmuLauncher', 48 'connector': 'mock_emu_frontend.MockEmuConnector', 49 } 50 }, 51 'mock-emu': { 52 'tcp_channel': True, 53 'gdb_channel': True, 54 }, 55 'gdb': _mock_emu + ['--exit', '--'], 56 'targets': {'test-target': {'mock-emu': {}}}, 57 } 58 59 def _build_cmd(self, args: list[str]) -> list[str]: 60 cmd = [ 61 'python', 62 str(_cli_path), 63 '--working-dir', 64 self._wdir.name, 65 '--config', 66 self._config_file, 67 ] + args 68 return cmd 69 70 def _run(self, args: list[str], **kwargs) -> subprocess.CompletedProcess: 71 """Run the CLI and wait for completion""" 72 return subprocess.run(self._build_cmd(args), **kwargs) 73 74 def _popen(self, args: list[str], **kwargs) -> subprocess.Popen: 75 """Run the CLI in the background""" 76 return subprocess.Popen(self._build_cmd(args), **kwargs) 77 78 79class TestNonInteractive(TestCli): 80 """Test non interactive commands.""" 81 82 def setUp(self) -> None: 83 super().setUp() 84 self.assertEqual(self._run(['start', 'test-target']).returncode, 0) 85 86 def tearDown(self) -> None: 87 self.assertEqual(self._run(['stop']).returncode, 0) 88 super().tearDown() 89 90 def test_already_running(self) -> None: 91 self.assertNotEqual(self._run(['start', 'test-target']).returncode, 0) 92 93 def test_gdb_cmds(self) -> None: 94 status = self._run( 95 ['gdb-cmds', 'show version'], 96 ) 97 self.assertEqual(status.returncode, 0) 98 99 def test_prop_ls(self) -> None: 100 status = self._run(['prop-ls', 'path1'], stdout=subprocess.PIPE) 101 self.assertEqual(status.returncode, 0) 102 self.assertTrue('prop1' in status.stdout.decode('ascii')) 103 status = self._run(['prop-ls', 'invalid path'], stdout=subprocess.PIPE) 104 self.assertNotEqual(status.returncode, 0) 105 106 def test_prop_get(self) -> None: 107 status = self._run( 108 ['prop-get', 'invalid path', 'prop1'], 109 stdout=subprocess.PIPE, 110 ) 111 self.assertNotEqual(status.returncode, 0) 112 status = self._run( 113 ['prop-get', 'path1', 'invalid prop'], 114 stdout=subprocess.PIPE, 115 ) 116 self.assertNotEqual(status.returncode, 0) 117 status = self._run( 118 ['prop-get', 'path1', 'prop1'], 119 stdout=subprocess.PIPE, 120 ) 121 self.assertEqual(status.returncode, 0) 122 self.assertTrue('val1' in status.stdout.decode('ascii')) 123 124 def test_prop_set(self) -> None: 125 status = self._run( 126 ['prop-set', 'invalid path', 'prop1', 'v'], 127 stdout=subprocess.PIPE, 128 ) 129 self.assertNotEqual(status.returncode, 0) 130 status = self._run( 131 ['prop-set', 'path1', 'invalid prop', 'v'], 132 stdout=subprocess.PIPE, 133 ) 134 self.assertNotEqual(status.returncode, 0) 135 status = self._run( 136 ['prop-set', 'path1', 'prop1', 'value'], 137 stdout=subprocess.PIPE, 138 ) 139 self.assertEqual(status.returncode, 0) 140 status = self._run( 141 ['prop-get', 'path1', 'prop1'], 142 stdout=subprocess.PIPE, 143 ) 144 self.assertEqual(status.returncode, 0) 145 self.assertTrue('value' in status.stdout.decode('ascii'), status.stdout) 146 147 def test_reset(self) -> None: 148 self.assertEqual(self._run(['reset']).returncode, 0) 149 self.assertTrue(os.path.exists(os.path.join(self._wdir.name, 'reset'))) 150 151 def test_load(self) -> None: 152 self.assertEqual(self._run(['load', 'executable']).returncode, 0) 153 154 def test_resume(self) -> None: 155 self.assertEqual(self._run(['resume']).returncode, 0) 156 157 158class TestForeground(TestCli): 159 """Test starting in foreground""" 160 161 def _test_common(self, cmd) -> None: 162 # Run the CLI process in a new session so that we can terminate both the 163 # CLI and the mock emulator it spawns in the foreground. 164 args = {} 165 if sys.platform == 'win32': 166 args['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP 167 else: 168 args['start_new_session'] = True 169 proc = self._popen(cmd, stdout=subprocess.PIPE, **args) 170 assert proc.stdout 171 output = proc.stdout.readline() 172 self.assertTrue( 173 'starting mock emulator' in output.decode('utf-8'), 174 output.decode('utf-8'), 175 ) 176 if sys.platform == 'win32': 177 # See https://bugs.python.org/issue26350 178 os.kill(proc.pid, signal.CTRL_BREAK_EVENT) 179 else: 180 os.kill(-proc.pid, signal.SIGTERM) 181 proc.wait() 182 proc.stdout.close() 183 184 def test_foreground(self) -> None: 185 self._test_common(['start', '--foreground', 'test-target']) 186 187 def test_debug(self) -> None: 188 self._test_common(['start', '--debug', 'test-target']) 189 190 191class TestInteractive(TestCli): 192 """Test interactive commands""" 193 194 def setUp(self) -> None: 195 super().setUp() 196 self.assertEqual(self._run(['start', 'test-target']).returncode, 0) 197 198 def tearDown(self) -> None: 199 self.assertEqual(self._run(['stop']).returncode, 0) 200 super().tearDown() 201 202 @staticmethod 203 def _read_nonblocking(fd: int, size: int) -> bytes: 204 try: 205 return os.read(fd, size) 206 except BlockingIOError: 207 return b'' 208 209 def test_term(self) -> None: 210 """Test the pw emu term command""" 211 212 if sys.platform == 'win32': 213 self.skipTest('pty not supported on win32') 214 215 # pylint: disable=import-outside-toplevel 216 # Can't import pty on win32. 217 import pty 218 219 # pylint: disable=no-member 220 # Avoid pylint false positive on win32. 221 pid, fd = pty.fork() 222 if pid == 0: 223 status = self._run(['term', 'tcp']) 224 # pylint: disable=protected-access 225 # Use os._exit instead of os.exit after fork. 226 os._exit(status.returncode) 227 else: 228 expected = '--- Miniterm on tcp ---' 229 230 # Read the expected string with a timeout. 231 os.set_blocking(fd, False) 232 deadline = time.monotonic() + 5 233 data = self._read_nonblocking(fd, len(expected)) 234 while len(data) < len(expected): 235 time.sleep(0.1) 236 data += self._read_nonblocking(fd, len(expected) - len(data)) 237 if time.monotonic() > deadline: 238 break 239 self.assertTrue( 240 expected in data.decode('ascii'), 241 data + self._read_nonblocking(fd, 100), 242 ) 243 244 # send CTRL + ']' to terminate miniterm 245 os.write(fd, b'\x1d') 246 247 # wait for the process to exit, with a timeout 248 deadline = time.monotonic() + 5 249 wait_pid, ret = os.waitpid(pid, os.WNOHANG) 250 while wait_pid == 0: 251 time.sleep(0.1) 252 # Discard input to avoid writer hang on MacOS, 253 # see https://github.com/python/cpython/issues/97001. 254 try: 255 self._read_nonblocking(fd, 100) 256 except OSError: 257 # Avoid read errors when the child pair of the pty 258 # closes when the child terminates. 259 pass 260 wait_pid, ret = os.waitpid(pid, os.WNOHANG) 261 if time.monotonic() > deadline: 262 break 263 self.assertEqual(wait_pid, pid) 264 self.assertEqual(ret, 0) 265 266 def test_gdb(self) -> None: 267 res = self._run(['gdb', '-e', 'executable'], stdout=subprocess.PIPE) 268 self.assertEqual(res.returncode, 0) 269 output = res.stdout.decode('ascii') 270 self.assertTrue('target remote' in output, output) 271 self.assertTrue('executable' in output, output) 272 273 274if __name__ == '__main__': 275 unittest.main() 276